Hot questions for Using AspectJ in multithreading

Top Java Programmings / AspectJ / multithreading

Question:

Context:

I have created a small (java) multithread server for a game. Despite my best efforts of following the best practices it turned out that some methods that were intended to be called from just one thread got called from 2 or more threads. After debugging and analysis I have managed to "fix" my design but I was wondering:

The question:

Is there a tool (or if not - is it possible (and how) to be developed) that allows you to mark some methods with annotations like @SingleThread or @ThreadCount(2) or @ThreadNameLike("my_fancy_thread_group*") which counts/monitors/logs access to these methods like:

  • @SingleThread - checks if this method is always accessed by only thread
  • @ThreadCount(2) - is accessed by two threads exactly
  • @ThreadNameLike - is accessed only by threads with name matching the pattern(s)

The idea is to do a TEST run of the program and get at least log record that annotated condition is violated.

I was thinking that probably AspectJ can do the job to some extend with it's pointcuts but then I realized that some approach similar to Dagger / Dagger2 will be better, i.e. when you want to test your server you will have to turn on an annotation processor (let's called it hypothetically "SafetyFirst") which will generate adapter (wrapper?) classes which contain the monitoring code. Then you will run the server, run some load tests and then check the logs for violations (or in an ideal world - get a report file).

I fully realize that such tool:

  • will not provide 100% coverage of all potential cases;
  • will mask/trigger heisenbugs
  • will slow down the examined program

but at very least it can serve as an early warning system that will clearly advertise violations of the intended design.


Answer:

I used a similar test with AspectJ load time weaving for intended printing of all function calls within my package.

Best way of load time weaving is you dont dirty your classes like compile time wevaing. When you remove -javaagent:<path to aspectj lib> and your custom astpect lib classpath entry from your run command. Then all all gone, clear.

I made some changes and implemented a test covering @ThreadCount functionality you asked. You need to download and install AspectJ.

Please see code snippets:

aspect Profile {

    private static Map<String, AtomicInteger> counterMap = new HashMap<String, AtomicInteger>();

    pointcut threadCountPc(test.ThreadCount tc) : execution(* test..*(..)) && @annotation(tc);

    Object around(test.ThreadCount tc) : threadCountPc(tc) {        

        String signature = thisJoinPointStaticPart.getSignature().toString();

        AtomicInteger counter = getCounter(signature);
        int currentValue = counter.incrementAndGet();

        if (currentValue >= tc.value()){
            System.out.println("[Thread Name:" + Thread.currentThread().getName() + 
            "] Method Name:" + signature + ", threadCount:" + currentValue + " exceeds " + tc.value());
        }

        try{
            return proceed(tc);
        }finally{
            counter.decrementAndGet();          
        }
    }

    private static AtomicInteger getCounter(String methodName){
        AtomicInteger value = counterMap.get(methodName);
        if (value == null){
            synchronized (counterMap){
                value = counterMap.get(methodName);
                if (value == null){
                    value = new AtomicInteger(0);
                    counterMap.put(methodName, value);
                }
            }
        }
        return value;
    }
}

Compiling: "C:/aspectj1.8/bin/ajc.bat" profile_ft.aj -cp C:/aspectj1.8/lib/aspectjrt.jar;../test/. -1.6 -outxml -outjar profile_ft.jar

Running: java -javaagent:C:/aspectj1.8/lib/aspectjweaver.jar -cp aspectj/profile_ft.jar;test test.Test1

Question:

I am new with AspectJ annotation for Java, and I am wondering if it is possible to put pointcut on a cross thread invocation.

Here is the code:

public class App {
    public static void main( String[] args ) {
        new Connector().getStart("testtest");
    }
}
public class Connector {
    public void getStart(String s1) {
        Handler h = new Handler(s1);
        h.start();
    }
}
public class Handler extends Thread {
    String s1;

    public Handler(String s1) {
        this.s1 = s1;
    }

    public void run() {
        new Plain().getValue(s1);   
    }
}
public class Plain {
    public void getValue(String s1) {
        System.out.println("Plain getValue: " + s1);
    }
}

I would like to have a pointcut that only triggers when Plain.getValue() is called by Connector.getStart().

Is it possible? Thanks.


Answer:

You are making a mistake believing that Plain.getValue(..) is called by Connector.getStart(..) because in a multi-threaded environment it is not. Let me prove it with a little tweak to the getValue(..) method, printing a stack trace:

package de.scrum_master.app;

public class Plain {
    public void getValue(String s1) {
        System.out.println("Plain getValue: " + s1);
        new Exception().printStackTrace(System.out);
    }
}

By the way, I have moved all your classes to package de.scrum_master.app because using the default package is discouraged in Java and also AspectJ does not like it when trying to match pointcuts.

Console log (multi-threaded):

Plain getValue: testtest
java.lang.Exception
    at de.scrum_master.app.Plain.getValue(Plain.java:4)
    at de.scrum_master.app.Handler.run(Handler.java:9)

See? There is no trace of Connector.getStart(..) in the log. If we also tweak getStart(..) so as to call the thread's run() method directly (i.e. not starting a new thread but executing in the same thread) instead of start(), the situation changes:

package de.scrum_master.app;

public class Connector {
    public void getStart(String s1) {
        Handler h = new Handler(s1);
        h.run();
    }
}

Console log (single-threaded):

Plain getValue: testtest
java.lang.Exception
    at de.scrum_master.app.Plain.getValue(Plain.java:4)
    at de.scrum_master.app.Handler.run(Handler.java:9)
    at de.scrum_master.app.Connector.getStart(Connector.java:4)
    at de.scrum_master.app.App.main(App.java:3)

In this situation we could use AspectJ's dynamic cflow() (control flow) pointcut like this:

package de.scrum_master.aspect;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;

@Aspect
public class SingleThreadAspect {
    @Before("execution(* de.scrum_master.app.Plain.getValue(..)) && cflow(execution(* de.scrum_master.app.Connector.getStart(..)))")
    public void interceptControlFlow(JoinPoint thisJoinPoint) {
        System.out.println(thisJoinPoint);
    }
}

The advice would be triggered just as you wish. But for the reason explained at the beginning of my answer cflow() does not (and cannot) work across threads because there is no such thing as a direct control flow across threads. Each thread's control flow starts with its run() method, no earlier. That is the whole concept of multi-threading.

So if you really want to emulate something like a cross-thread control flow for whatever doubtful reason, you need to do some manual bookkeeping.

But first let us revert the tweaked h.run() back to the original h.start() so as to reinstate the multi-threading situation. Let us also remove the printStackTrace(..) line from Plain.getStart(..).

Solution:

Disclaimer: I do not like annotation-style @AspectJ syntax, so I am switching over to native syntax. It is much more expressive and we can achieve what we want more easily in terms of ITD (inter-type definition) because

  • in native syntax we can just declare an additional instance member variable for a given class while
  • in @AspectJ syntax we would have to declare the target class to implement an interface with a default implementation which in turn would carry the member variable for our manual bookkeeping.

Let us modify App so as to also start a Handler thread directly. This is our negative test case because we do not want to trigger our advice there as the thread is started outside of Plain.getValue(..):

package de.scrum_master.app;

public class App {
    public static void main(String[] args) throws InterruptedException {
        // The aspect should ignore this thread
        new Handler("foo").start();
        // Wait a little while so as not to mess up log output
        Thread.sleep(250);
        new Connector().getStart("testtest");
    }
}

Console log without aspect:

Plain getValue: foo
Plain getValue: testtest

Aspect:

package de.scrum_master.aspect;

import de.scrum_master.app.*;

public aspect CrossThreadAspect {
    // Declare a new instance member for our bookkeeping
    private boolean Handler.cflowConnectorGetStart = false;

    // If handler thread is started from Connector.getStart(..), set a mark
    before(Handler handler) :
        call(void Handler.start()) &&
        cflow(execution(* Connector.getStart(..))) &&
        target(handler)
    {
        System.out.println(thisJoinPoint + "\n  doing bookkeeping");
        handler.cflowConnectorGetStart = true;
    }

    // If current thread is a marked Handler, log it
    before() :
        execution(* Plain.getValue(..)) &&
        if(Thread.currentThread() instanceof Handler) &&
        if(((Handler) Thread.currentThread()).cflowConnectorGetStart)
    {
        System.out.println(thisJoinPoint + "\n  triggered from parent thread via Connector.getStart(..)");
    }
}

Console log with aspect:

As you can see, the Handler thread started from App.main(..) is ignored by the aspect as expected. The Handler started from Connector.getStart(..) triggers the aspect.

Plain getValue: foo
call(void de.scrum_master.app.Handler.start())
  doing bookkeeping
execution(void de.scrum_master.app.Plain.getValue(String))
  triggered from parent thread via Connector.getStart(..)
Plain getValue: testtest

Question:

Problem

I want to print out the request url and response before running all calls to method

public class UpdateRequester {
   private void throwMessage(String requestUrl, String page) {
      //Some code inside
   }
}

the method will be called is in the Test class:

public class Test {
  public void testUpdate() {
    Executors.scheduleWithFixedDelay(new Runnable() {
        public void run() {
          //It will call throwMessage sometimes in the future
        }
    }, ...);
  }  
}

so I designed an aspect:

public aspect TestUpdate {
   static final void println(String s) {
     System.out.println(s);
   }

   pointcut testUpdateFlow() : cflow(this(Test) && execution(void testUpdate()));

   pointcut throwMessageCut(String url, String response) : this(UpdateRequester) && args(url, response) && execution(void throwMessage(String, String));

   before(String url, String response) : testUpdateFlow() && throwMessageCut( url,  response) {
    println("=============Url============");
    println(url);
    println("============Respnse=========");
    println(response);
   }
}

The aspect doesn't print anything to the console. If I remove testUpdateFlow(), it did print to the console. I think that cflow in aspectJ doesn't consider the code running by Executors.scheduleWithFixedDelay to be in the flow of testUpdate(). Is there any way I can use for aspectJ to detect thread-crossing call in this situation?


Answer:

Let us assume we have these classes:

package de.scrum_master.app;

public class UpdateRequester {
    public void doSomething() {
        throwMessage("http://my.url.org/foo", "my page");
    }

    private void throwMessage(String requestUrl, String page) {
        System.out.println("Throwing message for request " + requestUrl + " on page '" + page + "'");
    }
}

Because throwMessage(..) is private in your example, I deliberately added a public method doSomething() which can be called by the test class to which I also added a main(..) method as an entry point for my test:

package de.scrum_master.app;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test {
    public void testUpdate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
        executorService.scheduleWithFixedDelay(
            new Runnable() {
                public void run() {
                    new UpdateRequester().doSomething();
                }
            },
            500L,
            1000L,
            TimeUnit.MILLISECONDS
        );
    }

    public static void main(String[] args) {
        new Test().testUpdate();
    }
}

Now let us print an exception stack trace from our before() advice in order to find out what the control flow really is:

package de.scrum_master.app;

import de.scrum_master.app.UpdateRequester;

public aspect TestUpdate {
    pointcut throwMessageCut(String url, String response) :
        this(UpdateRequester) &&
        args(url, response) &&
        execution(void throwMessage(String, String));

    before(String url, String response) :
        /*testUpdateFlow() &&*/
        throwMessageCut(url,  response)
    {
        System.out.println(thisJoinPoint);
        new Exception().printStackTrace(System.out);
    }
}

You see a stack trace like this:

execution(void de.scrum_master.app.UpdateRequester.throwMessage(String, String))
java.lang.Exception
    at de.scrum_master.app.TestUpdate.ajc$before$de_scrum_master_app_TestUpdate$1$33fbc0c(TestUpdate.aj:16)
    at de.scrum_master.app.UpdateRequester.throwMessage(UpdateRequester.java:9)
    at de.scrum_master.app.UpdateRequester.doSomething(UpdateRequester.java:5)
    at de.scrum_master.app.Test$1.run(Test.java:13)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Throwing message for request http://my.url.org/foo on page 'my page'

I.e. that this() is not Test but Test$1 which is the anonymous inner Runnable subclass you define in your code. You also see that Test.testUpdate() is not really in the control flow as it is nowhere to be seen in the stack trace. You can modify the pointcut like this:

pointcut testUpdateFlow() :
    cflow(
        this(Runnable) &&
        withincode(public void Test..*.run(..)) &&
        call(* UpdateRequester.*(..))
    );

It means: In the control flow of

  • an instance of Runnable,
  • somewhere within the code of a public void run(..) method defined below Test (inner class),
  • calling any method of UpdateRequester.

I.e. now the aspect looks like this (the console output stays the same):

package de.scrum_master.app;

import de.scrum_master.app.UpdateRequester;
import java.lang.Runnable;

public aspect TestUpdate {
    pointcut testUpdateFlow() :
        cflow(
            this(Runnable) &&
            withincode(public void Test..*.run(..)) &&
            call(* UpdateRequester.*(..))
        );

    pointcut throwMessageCut(String url, String response) :
        this(UpdateRequester) &&
        args(url, response) &&
        execution(void throwMessage(String, String));

    before(String url, String response) :
        testUpdateFlow() &&
        throwMessageCut(url,  response)
    {
        System.out.println(thisJoinPoint);
        new Exception().printStackTrace(System.out);
    }
}

You could also use nested cflow() statements like this:

pointcut testUpdateFlow() :
    cflow(
        this(Runnable) &&
        cflow(execution(public void Test..*.run(..))) &&
        call(* UpdateRequester.*(..))
    );

Alternatively, in order to avoid the anonymous inner class you can create a named inner class:

package de.scrum_master.app;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Test {
    public static class UpdateRequesterStarter implements Runnable {
        public void run() {
            new UpdateRequester().doSomething();
        }
    }

    public void testUpdate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
        executorService.scheduleWithFixedDelay(
            new UpdateRequesterStarter(),
            500L,
            1000L,
            TimeUnit.MILLISECONDS
        );
    }

    public static void main(String[] args) {
        new Test().testUpdate();
    }
}

Now the output changes, please note the difference in the call stack:

execution(void de.scrum_master.app.UpdateRequester.throwMessage(String, String))
java.lang.Exception
    at de.scrum_master.app.TestUpdate.ajc$before$de_scrum_master_app_TestUpdate$1$9c6f966b(TestUpdate.aj:24)
    at de.scrum_master.app.UpdateRequester.throwMessage(UpdateRequester.java:9)
    at de.scrum_master.app.UpdateRequester.doSomething(UpdateRequester.java:5)
    at de.scrum_master.app.Test$UpdateRequesterStarter.run(Test.java:10)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Throwing message for request http://my.url.org/foo on page 'my page'

Now you can refine/simplify the testUpdateFlow() pointcut to:

package de.scrum_master.app;

import de.scrum_master.app.UpdateRequester;
import de.scrum_master.app.Test.UpdateRequesterStarter;

public aspect TestUpdate {
    pointcut testUpdateFlow() :
        cflow(execution(public void UpdateRequesterStarter.run(..)));

    pointcut throwMessageCut(String url, String response) :
        this(UpdateRequester) &&
        args(url, response) &&
        execution(void throwMessage(String, String));

    before(String url, String response) :
        testUpdateFlow() &&
        throwMessageCut(url,  response)
    {
        System.out.println(thisJoinPoint);
        new Exception().printStackTrace(System.out);
    }
}

Please also note the changed import statements.

Question:

I am trying to track to obtain the parent thread id of each new thread created in a program using AspectJ. Since a new thread starts execution using the start() method, I thought the following technique should work:

aspect getParentThread {
    pointcut threadStarting(): call(public void start());
    Object around(): threadStarting() {
         long parentThread = Thread.currentThread().getId();
         Object ret = proceed();
         long newThread = Thread.currentThread().getId();
         if (parentThread != newThread) {
              /*Store parentThread id in data structure */
         }
         return ret;
     }
}

But this simply does not work. Although the advice executes, even after proceed() completes there is just a single thread id. So what am I getting wrong here?


Answer:

Warren Dew is right, but I want to add some sample code in order to show how you can easily do it with AspectJ. You do not even need an around() advice, a simple before() is enough.

Driver application:

package de.scrum_master.app;

public class Application {
    public static void main(String[] args) {
        new Thread(
            new Runnable() {
                @Override
                public void run() {}
            },
            "first thread"
        ).start();
        new Thread(
            new Runnable() {
                @Override
                public void run() {}
            },
            "second thread"
        ).start();
    }
}

Aspect:

package de.scrum_master.aspect;

public aspect ThreadStartInterceptor {
    before(Thread childThread) :
        call(public void Thread+.start()) &&
        target(childThread)
    {
        System.out.printf(
            "%s%n  Parent thread: %3d -> %s%n  Child thread:  %3d -> %s%n",
            thisJoinPoint,
            Thread.currentThread().getId(),
            Thread.currentThread().getName(),
            childThread.getId(),
            childThread.getName()
        );
    }
}
  • As you can see, I am limiting method interception to Thread+, i.e. to Thread and subclass instances. I am doing this explicitly even though it is not strictly necessary because the next point already does it implicitly:
  • I am also binding the child thread to a variable which can be used neatly from within the aspect.

Console log:

call(void java.lang.Thread.start())
  Parent thread:   1 -> main
  Child thread:   11 -> first thread
call(void java.lang.Thread.start())
  Parent thread:   1 -> main
  Child thread:   12 -> second thread

Question:

Original issue

Populate the Java MDC from a thread to all its spawned inner threads (parent to children relation)

WIP solution using AspectJ

I'm able to write an aspect intercepting all Runnable creation but since I want a different aspect instance for each use (with a custom annotation) as I have to store the MDC somewhere when executing code from the parent thread, I'm unable to write a pointcut intercepting the newly created instance of Runnable so I can set the MDC using the previous context map.

Here's the aspect

@Aspect("percflow(@annotation(com.bell.cts.commons.cron.framework.scheduler.domain.MDCTrace))")
public class MDCTraceAspect {

  private final Logger logger = LoggerFactory.getLogger(MDCTraceAspect.class);
  private int i;
  private final Map<String, String> contextMap;

  public MDCTraceAspect() {
    i = new Random().nextInt();
    MDC.clear();
    MDC.put("IP", String.valueOf(i));
    contextMap = MDC.getCopyOfContextMap();
    logger.debug(String.format("[%d] New Aspect", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..))")
  public void beforeNewRunnable(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.debug(String.format("[%d] New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(* Runnable+.*(..))")
  public void before(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] RUNNABLE WORKS!", Thread.currentThread().getId()));
  }

  @Before("execution(void Child.run())")
  public void beforeChildRun(JoinPoint joinPoint) {
    MDC.setContextMap(contextMap);
    logger.info(String.format("[%d] CHILD WORKS!", Thread.currentThread().getId()));
  }
}

And here's a Parent, Child and custom annotation

public class Parent {

  private final Logger logger = LoggerFactory.getLogger(Parent.class);
  private ExecutorService executorService;

  @MDCTrace
  public void runMultiThreadByExecutor() throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    logger.info(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(new Child());
    logger.info(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      logger.info(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    logger.info(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    parent.runMultiThreadByExecutor();
  }
}
public class Child implements Runnable {

  private final Logger logger = LoggerFactory.getLogger(Child.class);

  @Override
  public void run() {
    logger.info(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {
}

Goal

Final goal is to simply have to annotate the entry point of a MDC context so any threads/runnables/futures created while executing the annotated method (even other objects) are intercepted so MDC is properly set using the original/parent thread MDC info stored in the aspect instance of current context flow.

Both tentatives before and beforeChildRun don't work and I can't find how to make one of them work.

Thank you

Bonus point if someone can guide me on how to also make this works for parallelStream too.


Answer:

First you need to understand that a new thread is not within the control flow of its parent thread. See my other answers for an explanation incl. sample code and console log:

  • https://stackoverflow.com/a/41742907/1082681
  • https://stackoverflow.com/a/29184209/1082681

Thus, anything related to cflow() or aspect instantiation percflow() will not work in this case, as you already noticed.

The only way to get a part of what you need - at least for your own classes if you use compile-time weaving and also for third-party JARs/classes (except JRE classes) if you use load-time weaving - is manual bookkeeping.

Look at this example, I modified your own code a bit in order to show a workaround and its limits. I also wanted to avoid using any logging framework and am printing to System.out instead. Thus I had to replace MDC by a dummy class in order to make the code compile.

package de.scrum_master.app;

import java.util.HashMap;
import java.util.Map;

public class MDC {
  private static ThreadLocal<Map<String, String>> contextMap = new InheritableThreadLocal<>();

  static {
    clear();
  }

  public static void clear() {
    contextMap.set(new HashMap<>());
  }

  public static void put(String key, String value) {
    contextMap.get().put(key, value);
  }

  public static Map<String, String> getCopyOfContextMap() {
    return new HashMap<>(contextMap.get());
  }

  public static void setContextMap(Map<String, String> contextMap) {
    MDC.contextMap.set(contextMap);
  }
}
package de.scrum_master.app;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MDCTrace {}
package de.scrum_master.app;

public class Child implements Runnable {
  @Override
  public void run() {
    System.out.println(String.format("[%d] Running in the child thread", Thread.currentThread().getId()));
  }
}
package de.scrum_master.app;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Parent {
  private ExecutorService executorService;

  @MDCTrace
  public Runnable runMultiThreadByExecutorAnnotated(Runnable runnable) throws InterruptedException {
    return doStuff(runnable);
  }

  @MDCTrace
  public Runnable runMultiThreadByExecutorAnnotated() throws InterruptedException {
    return doStuff();
  }

  public Runnable runMultiThreadByExecutorPlain() throws InterruptedException {
    return doStuff();
  }

  public Runnable runMultiThreadByExecutorPlain(Runnable runnable) throws InterruptedException {
    return doStuff(runnable);
  }

  private Runnable doStuff() throws InterruptedException {
    return doStuff(new Child());
  }

  private Runnable doStuff(Runnable runnable) throws InterruptedException {
    executorService = Executors.newCachedThreadPool();
    System.out.println(String.format("[%d] Before start child thread", Thread.currentThread().getId()));

    executorService.submit(runnable);
    System.out.println(String.format("[%d] After start child thread", Thread.currentThread().getId()));

    List.of(10, 11, 12, 13, 14).parallelStream().forEach(i -> {
      //System.out.println(String.format("[%d] Loop iteration #%d", Thread.currentThread().getId(), i));
    });

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.SECONDS);
    System.out.println(String.format("[%d] ExecutorService is over", Thread.currentThread().getId()));
    System.out.println("\n----------------------------------------\n");
    return runnable;
  }

  public static void main(String[] args) throws InterruptedException {
    Parent parent = new Parent();
    System.out.println("MDCTrace annotation");
    parent.runMultiThreadByExecutorAnnotated();
    System.out.println("No annotation");
    parent.runMultiThreadByExecutorPlain();

    Runnable runnable = new Child();
    System.out.println("MDCTrace annotation (runnable created outside of control flow)");
    parent.runMultiThreadByExecutorAnnotated(runnable);
    System.out.println("No annotation (re-use runnable created outside of control flow)");
    parent.runMultiThreadByExecutorPlain(runnable);

    System.out.println("MDCTrace annotation (save returned runnable)");
    runnable = parent.runMultiThreadByExecutorAnnotated();
    System.out.println("No annotation (re-use returned runnable)");
    parent.runMultiThreadByExecutorPlain(runnable);
}
}

As you can see I have a positive and a negative test example (with and without @MDCTrace annotation) and three cases for each of these:

  1. Creating runnables inside the control flow of the annotated (or non-annotated) method like you do in your own example.
  2. Creating runnables outside the control flow of the annotated (or non-annotated) method, passing them by reference into the control flow.
  3. Creating the first runnable inside the control flow of the annotated method, returning it and passing it into the control flow of the non-annotated method.

Numbers 2 and 3 are there to demonstrate the limits of the subsequent aspect approach which mainly consists in doing manual bookkeeping of all Runnable instances created within the control flow of an annotated method.

package de.scrum_master.aspect;

import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

import de.scrum_master.app.MDC;

@Aspect
public class MDCTraceAspect {
  private static final Random RANDOM = new Random(); 
  private Map<String, String> contextMap;
  private Set<Runnable> runnables = new HashSet<>();

  @Pointcut("@annotation(de.scrum_master.app.MDCTrace) && execution(* *(..))")
  private static void entryPoint() {}

  @Before("entryPoint()")
  public void executeEntryPoint() {
    MDC.clear();
    MDC.put("IP", String.valueOf(RANDOM.nextInt()));
    contextMap = MDC.getCopyOfContextMap();
    System.out.println(String.format("[%d] * Entry point", Thread.currentThread().getId()));
  }

  @Before("execution(Runnable+.new(..)) && cflow(entryPoint()) && target(runnable)")
  public void beforeNewRunnable(JoinPoint joinPoint, Runnable runnable) {
    runnables.add(runnable);
    MDC.setContextMap(contextMap);
    System.out.println(String.format("[%d] * New Runnable", Thread.currentThread().getId()));
  }

  @Before("execution(public void Runnable+.run(..)) && target(runnable)")
  public void beforeRunnableExecution(JoinPoint joinPoint, Runnable runnable) {
    if (!runnables.contains(runnable))
      return;
    MDC.setContextMap(contextMap);
    System.out.println(String.format("[%d] * Runnable started", Thread.currentThread().getId()));
  }
}

This yields the following console log (broken down into 3 parts):


  1. Creating runnables inside the control flow of the annotated (or non-annotated) method like you do in your own example:
MDCTrace annotation
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[12] * Runnable started
[12] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation
[1] Before start child thread
[1] After start child thread
[13] Running in the child thread
[1] ExecutorService is over

----------------------------------------

This works as you might expect it. No surprises here.


  1. Creating runnables outside the control flow of the annotated (or non-annotated) method, passing them by reference into the control flow:
MDCTrace annotation (runnable created outside of control flow)
[1] * Entry point
[1] Before start child thread
[1] After start child thread
[14] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation (re-use runnable created outside of control flow)
[1] Before start child thread
[1] After start child thread
[15] Running in the child thread
[1] ExecutorService is over

----------------------------------------

As you can see, no log output here after the entry point has been reached. This is not what you might want, but the runnable has been created outside the control flow and passed in, so the aspect does not get triggered here.


  1. Creating the first runnable inside the control flow of the annotated method, returning it and passing it into the control flow of the non-annotated method:
MDCTrace annotation (save returned runnable)
[1] * Entry point
[1] * New Runnable
[1] Before start child thread
[1] After start child thread
[16] * Runnable started
[16] Running in the child thread
[1] ExecutorService is over

----------------------------------------

No annotation (re-use returned runnable)
[1] Before start child thread
[1] After start child thread
[17] * Runnable started
[17] Running in the child thread
[1] ExecutorService is over

----------------------------------------

Here part A is like in case no. 1, but part B also prints a log line for the non-annotated method because the Runnable instance has been registered in the aspect's bookkeeping during the control flow of the annotated method. So here you see a log line you probably rather want to avoid.

So what is the conclusion here? There is no perfect solution, you need to check your code and what cases you have there, then design the aspect to accommodate those cases. If you don't have cases like the ones I made up in no. 2 and 3, my approach works.

Some other things to note:

  • Beware the difference of Runnables and Threads. They are not the same, you can re-use the same runnable in multiple threads. Furthermore, you can re-use threads too, e.g. by using thread pools. So this can get arbitrarily complex. Each runnable or thread marked as a target for your aspect could be re-used later in a context you don't want to log.
  • For parallel streams or other cases in which runnables are being created by the JRE itself this will never work because the runnables and threads created by internal JRE classes are not subject to aspect weaving, neither in the compile-time nor in the load-time weaving case. In theory you could weave aspect code into the JRE or JDK, creating new JARs from the woven classes and replacing the originals or prepending them to the boot classpath. But this is a bit complicated and you really need to control your application's execution environment in order to start the JVM with the right parameters. I did that before and it works, but this is not for beginners and probably a bad idea to begin with.