Hot questions for Using AspectJ in concurrency

Top Java Programmings / AspectJ / concurrency

Question:

I'm new to AspectJ and I'm trying to figure out, how too keep / track a context of multiple async method calls. Imagine the following code:

@TimerStart
public void doSomething() throws InterruptedException {
    Thread.sleep(1000);
    MyCallable callable = new MyCallable();
    Future future = executorService.submit(callable );
}

private class MyCallable implements Callable {
    @Override
    public Object call() throws Exception {
        someOtherMethod();
        return null;
    }

    @TimerEnd
    private void someOtherMethod() throws InterruptedException {
        Thread.sleep(1000);
    }
}

I'd like to measure the time passed between @TimerStart and @TimerEnd. I'm struggling with two problems right now:

  • How do I keep an object between to aspects. Fields in an aspect seem to be all static so what about concurrency issues...?
  • How do I get two advices, one executed before @TimerStart and one after @TimerEnd.

Currently I have something along the lines of this:

public aspect TimerAspect {

    pointcut timerStart(Object object, TimerStart timed):
        execution(@TimerStart * *(..)) && this(object) && @annotation(timed);

    pointcut timerStop(Object object, TimerEnd timed):
        cflow(execution(@TimerEnd * *(..)) && this(object) && @annotation(timed) && !within(FlowTimerAspect));


    before(Object object, TimerStart timed): timerStart(object, timed)  {
        System.out.println("##### Flow timer START");
    }

    after(Object object, TimerEnd timed): timerStop(object, timed)  {
        System.out.println("##### Flow timer STOP");
    }

However the only thing I get right now is a StackOverflowException (yeah I know - that's why I'm asking here).

EDIT: I stumbled upon percflow which seems to do the trick BUT only when the @TimerStart and @TimerEnd appear in the same thread. Suggestions are highly appreciated!!

public aspect TimerAspect percflow(timerStart(Object, TimerStart)) {

    private long context;

    pointcut timerStart(Object object, TimerStart timed):
            execution(@TimerStart * *(..)) && this(object) && @annotation(timed);

    pointcut timerStop(Object object, TimerEnd timed):
            execution(@TimerEnd * *(..)) && this(object) && @annotation(timed);


    before(Object object, TimerStart timed): timerStart(object, timed)  {
        context = System.currentTimeMillis();
    }

    after(Object object, TimerEnd timed): timerStop(object, timed)  {
        long passed = System.currentTimeMillis() - context;
        System.out.println("passed time: " + passed);
    }
}

Answer:

Since you're planning to switch threads while measuring, the percflow instantiation method is not going to help you. You'll have to stick with the default singleton aspect and keep the timing values for the object of interest in a WeakHashMap. That way, you keep timings as long as the objects/threads associated with the timing are alive. We'll need another annotation to mark the event of associating a new object (a Callable in this example) with your timing. Let's call this @TimerJoin. The @TimerJoin annotation would be analogous to your existing @TimerStart and @TimerEnd annotations. Your measuring aspect will look like this.

import java.util.Map;
import java.util.WeakHashMap;

public aspect TimerAspect {

    private final Map<Object, Timer> objectTiming = new WeakHashMap<>();
    private final ThreadLocal<Timer> currentThreadTimer = new ThreadLocal<>();

    pointcut timerStart(Object object):
            execution(@TimerStart * *(..)) && this(object);

    pointcut timerStop(Object object):
            execution(@TimerEnd * *(..)) && this(object);

    pointcut timerJoin(Object object):
        (execution(@TimerJoin * *(..)) || execution(@TimerJoin *.new(..)) ) 
        && this(object);

    before(Object object): timerStart(object) {
        Timer timer = new Timer();
        timer.start();
        objectTiming.put(object, timer);
        currentThreadTimer.set(timer);
        System.out.println("##### Flow timer START");
    }

    before(Object object): timerJoin(object) {
        Timer timing = currentThreadTimer.get();
        objectTiming.put(object, timing);
        System.out.println("##### Flow timer JOIN");
    }

    after(Object object): timerStop(object) {
        Timer timing = objectTiming.get(object);
        timing.stop();
        System.out.println("##### Flow timer STOP");
        System.out.println("Elapsed: " + timing.getElapsed());
    }

}

And the simple Timer.java class:

public class Timer {

    private long start;
    private long stop;

    public long getStart() {
        return start;
    }

    public long getStop() {
        return stop;
    }

    public void start() {
        start = System.currentTimeMillis();
    }

    public void stop() {
        stop = System.currentTimeMillis();
    }

    public long getElapsed() {
        return stop-start;
    }
}

Modify your callable to mark it to join the timer on the current thread:

private class MyCallable implements Callable {

    @TimerJoin
    public MyCallable() {
    }

    @Override
    public Object call() throws Exception {
        someOtherMethod();
        return null;
    }

    @TimerEnd
    private void someOtherMethod() throws InterruptedException {
        Thread.sleep(1000);
    }
}

The rest of your code will be the same.

You may notice that the aspect is using a ThreadLocal as a means of storage for the current timer to be able to associate it with new objects. You may choose another kind of storage for this, but for the sake of the example, I tried to keep it simple. Also, again for the sake of simplicity, I left out any safety checks for nulls in the aspect. You'll need to handle the corner cases yourself.

Question:

I am struggling to create a ReentrantReadWriteLock with AspectJ for every single object that is constructed and is a type of Mystructure. Here is my source code.

The aspect class

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Aspect
public class LocksAspect {
    private ReentrantReadWriteLock rwLock;
    private Lock acquireReadLock;
    private Lock acquireWriteLock;

    @Before("!within(LocksAspect)&&execution(*.new(..))")
    public void LookupBefores() {
        rwLock = new ReentrantReadWriteLock();
        acquireReadLock = rwLock.readLock();
        acquireWriteLock = rwLock.writeLock();
    }

    @Pointcut("call(void MyStructure.Insert(String))")
    public void InsertPointcut() {
    }

    @Pointcut("call(void MyStructure.Read(int))")
    public void ReadPointcut() {
    }

    @Before("InsertPointcut()")
    public void InsertPointcutBefore(JoinPoint pointcut) throws InterruptedException {
        acquireWriteLock.lock();
        String thrdName = Thread.currentThread().getName();
        System.out.println(thrdName + "  is entering in critical Section {} ");
        Thread.sleep(10000);
    }


    @After("InsertPointcut()")
    public void InsertPointcutAfter(JoinPoint pointcut) {
        String thrdName = Thread.currentThread().getName();
        System.out.println(thrdName + " received notification and is exiting critical Section {} ");
        acquireWriteLock.unlock();
    }

    @Before("ReadPointcut()")
    public void ReadPointcutBefore(JoinPoint pointcut) throws InterruptedException {
        acquireReadLock.lock();
        String thrdName = Thread.currentThread().getName();
        System.out.println(thrdName + "  is entering in critical Section {} ");
        Thread.sleep(1000);
    }


    @After("ReadPointcut()")
    public void ReadPointcutAfter(JoinPoint pointcut) {
        String thrdName = Thread.currentThread().getName();
        System.out.println(thrdName + " received notification and is exiting critical Section {} ");
        acquireReadLock.unlock();
    }
}

The Thread writer class.(The Reader thread class is not important because my problem is different so i omitted it)

public class Writer extends Thread{
   private MyStructure myStructure;
    public Writer(MyStructure myStructure) {
        this.myStructure=myStructure;
    }

    @Override
    public void run() {
        this.myStructure.Insert("example");
    }
}

My structure class

import java.util.ArrayList;

public class MyStructure {
    ArrayList<String> examplelist;

    public MyStructure() {
        examplelist = new ArrayList<String>();
    }

    public void Insert(String value) {
        examplelist.add(value);
    }

    public void Read(int pos) {
        examplelist.get(pos);
    }
}

The main

MyStructure structure = new MyStructure();
        MyStructure structure1 = new MyStructure();
        new Thread(new Writer(structure), "Thread1").start();
        new Thread(new Writer(structure1), "Thread2").start();

The output

Thread2  is entering in critical Section {} 
Thread2 received notification and is exiting critical Section {} 
Thread1  is entering in critical Section {} //Thread1 will wait for Thread2 to release the lock in critical section   which is wrong
Thread1 received notification and is exiting critical Section {} 

Now my problem is How I will get a new ReentrantReadWriteLock for each object of Mystructure that created. For example, if we run the above example both Thread1 and Thread2 must be able to access the critical section because they have different references of the object, but this should not have happened. My problem is that the Thread2 will block and wait for Thread1 to finish which is wrong. How can I bypass this problem of construction with Aspect4j?


Answer:

The key to your problem's solution is that you need one set of locks per MyStructure instance. Your aspect is a singleton, though. So either you need to use another aspect instantiation scheme (which is what I will use in my answer) or do manual bookkeeping within the singleton aspect by keeping a set of locks and add a new element into that set whenever a MyStructure object is created.

In order to better understand my answer, please refer to the AspectJ manual for information about aspect instantiation.

Before we start, a few comments concerning your code and why I have changed it a bit:

  • Your Writer is already a Thread subclass, no need to wrap it into another thread instance. (I know you probably just did it for being able to name the thread, but that could have been achieved by adding a constructor in your class taking the name argument and passing it through to the super class constructor.)
  • You should not call a variable of type JoinPoint pointcut because a joinpoint is not a pointcut AOP-wise.
  • I factored out the logging into its own helper method and improved it a bit so we can see more clearly what happens when.
  • I decided to replace each pair of before and after advice by an around advice. This is optional, of course, but I prefer to see the control flow in one place in this case. BTW, be careful to change the around advice's return type to Object and actually return something if you want to target non-void methods. Here it is not necessary because in both cases we have void methods.
  • I also decided to inline the pointcuts, which is also optional, but makes the sample code somewhat more concise for demonstration purposes here.
  • I added a Reader class and use it in order to show the difference between reentrant read vs write locks.
  • I also took care of making MyStructure instances nameable and printable in order to identify the target objects more easily in the log.
  • I randomised the execution order of reader/writer threads so as to mix them in a more real-world fashion. In order to avoid exceptions polluting the log when reading from a newly created MyStructure before writing into, I made sure that MyStructure gets a default element right in the constructor. I did not want to catch exceptions here to in order to keep the sample code simple.
  • I put the aspect in another package than the application code in order to demonstrate than generally you need to use fully qualified class names when using annotation-style AspectJ (in native syntax imports would be enough).

Now what is the solution? Basically just this, because the changes mentioned above just make the code better or the test program closer to real-life situations:

@Aspect("pertarget(execution(de.scrum_master.app.MyStructure.new(..)))")
public class LocksAspect { // (...)

This creates one aspect instance per MyStructure object. This is also why we can assign the values of readWriteLock, readLock and writeLock directly instead of using a special pointcut + advice pair like in your singleton aspect.

Here is the full, refactored sample code:

Application code + driver application:

package de.scrum_master.app;

import java.util.ArrayList;
import java.util.List;

public class MyStructure {
  private String name;
  private List<String> myList;

  public MyStructure(String name) {
    this.name = name;
    myList = new ArrayList<String>();
    myList.add("dummy element to permit reading");
  }

  public void insert(String value) {
    myList.add(value);
  }

  public void read(int pos) {
    myList.get(pos);
  }

  @Override
  public String toString() {
    return "MyStructure[" + name + "]";
  }
}
package de.scrum_master.app;

public class Writer extends Thread {
  private MyStructure myStructure;

  public Writer(MyStructure myStructure) {
    this.myStructure = myStructure;
  }

  @Override
  public void run() {
    myStructure.insert("example");
  }
}
package de.scrum_master.app;

public class Reader extends Thread {
  private MyStructure myStructure;

  public Reader(MyStructure myStructure) {
    this.myStructure = myStructure;
  }

  @Override
  public void run() {
    myStructure.read(0);
  }
}
package de.scrum_master.app;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class Application {
  public static void main(String[] args) {
    MyStructure structureA = new MyStructure("One");
    MyStructure structureB = new MyStructure("Two");
    List<Thread> threads = Arrays.asList(
      new Writer(structureA), new Writer(structureB), new Writer(structureA), new Writer(structureB),
      new Reader(structureA), new Reader(structureB), new Reader(structureA), new Reader(structureB),
      new Reader(structureA), new Reader(structureB), new Reader(structureA), new Reader(structureB)
    );
    Collections.shuffle(threads);
    for (Thread thread : threads)
      thread.start();
  }
}

Aspect:

package de.scrum_master.aspect;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;

import de.scrum_master.app.MyStructure;

@Aspect("pertarget(execution(de.scrum_master.app.MyStructure.new(..)))")
public class LocksAspect {
  private static final long startTime = System.currentTimeMillis();

  private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  private Lock readLock = readWriteLock.readLock();
  private Lock writeLock = readWriteLock.writeLock();

  @Around("target(myStructure) && execution(void insert(String))")
  public void InsertPointcutBefore(ProceedingJoinPoint thisJoinPoint, MyStructure myStructure) throws Throwable {
    writeLock.lock();
    log("entering write section", myStructure);
    try {
      Thread.sleep(1000);
      thisJoinPoint.proceed();
    } finally {
      log("exiting write section", myStructure);
      writeLock.unlock();
    }
  }

  @Around("target(myStructure) && execution(void read(int))")
  public void ReadPointcutBefore(ProceedingJoinPoint thisJoinPoint, MyStructure myStructure) throws Throwable {
    readLock.lock();
    log("entering read section", myStructure);
    try {
      Thread.sleep(1000);
      thisJoinPoint.proceed();
    } finally {
      log("exiting read section", myStructure);
      readLock.unlock();
    }
  }

  private static void log(String message, Object targetObject)  {
    System.out.printf(
      "%8d ms | %-25s | %-17s | %s%n",
      System.currentTimeMillis() - startTime,
      Thread.currentThread(),
      targetObject,
      message
    );
  }
}

Sample log output:

       4 ms | Thread[Thread-3,5,main]   | MyStructure[Two]  | entering write section
       4 ms | Thread[Thread-6,5,main]   | MyStructure[One]  | entering read section
       4 ms | Thread[Thread-8,5,main]   | MyStructure[One]  | entering read section
       4 ms | Thread[Thread-4,5,main]   | MyStructure[One]  | entering read section
       4 ms | Thread[Thread-10,5,main]  | MyStructure[One]  | entering read section
    1019 ms | Thread[Thread-3,5,main]   | MyStructure[Two]  | exiting write section
    1020 ms | Thread[Thread-8,5,main]   | MyStructure[One]  | exiting read section
    1020 ms | Thread[Thread-4,5,main]   | MyStructure[One]  | exiting read section
    1020 ms | Thread[Thread-11,5,main]  | MyStructure[Two]  | entering read section
    1020 ms | Thread[Thread-5,5,main]   | MyStructure[Two]  | entering read section
    1020 ms | Thread[Thread-6,5,main]   | MyStructure[One]  | exiting read section
    1020 ms | Thread[Thread-10,5,main]  | MyStructure[One]  | exiting read section
    1025 ms | Thread[Thread-2,5,main]   | MyStructure[One]  | entering write section
    2023 ms | Thread[Thread-11,5,main]  | MyStructure[Two]  | exiting read section
    2024 ms | Thread[Thread-5,5,main]   | MyStructure[Two]  | exiting read section
    2025 ms | Thread[Thread-1,5,main]   | MyStructure[Two]  | entering write section
    2026 ms | Thread[Thread-2,5,main]   | MyStructure[One]  | exiting write section
    2026 ms | Thread[Thread-0,5,main]   | MyStructure[One]  | entering write section
    3026 ms | Thread[Thread-1,5,main]   | MyStructure[Two]  | exiting write section
    3026 ms | Thread[Thread-7,5,main]   | MyStructure[Two]  | entering read section
    3026 ms | Thread[Thread-9,5,main]   | MyStructure[Two]  | entering read section
    3028 ms | Thread[Thread-0,5,main]   | MyStructure[One]  | exiting write section
    4028 ms | Thread[Thread-7,5,main]   | MyStructure[Two]  | exiting read section
    4029 ms | Thread[Thread-9,5,main]   | MyStructure[Two]  | exiting read section

Question:

We are developing a web application using Spring with a layered architecture (REST service layer, business layer and repository). For each REST service we are returning a generic RestResponse object, which has a data field and a list for messages and errors.

Though when we need to perform validation on the data we get in the REST layer, we can do it either in that layer or the business layer (or both). I thought of an idea to only do validation in the business layer to avoid duplicating code.

My thinking was that when we create a RestResponse object in the REST layer, we then set data to it by calling the business layer method. In that business method we would do validation and call Logger to log some messages (warnings or errors). Using aspectj these logger calls would be intercepted and their arguments (the message) would be put directly in our RestResponse message list.

To make it clearer here are some code samples:

The object returned to the client from the REST layer

public class RestResponse<T> {
    private T data;
    List<String> messages = new ArrayList();

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public final List<String> getMessages() {
        return messages;
    }

    public void setMessages(List<String> messages) {
        this.messages = messages;
    }
}

The Rest controller

@RestController
public class TestRestController {
    @Autowired
    private ServiceImpl service;

    @RequestMapping("")
    public RestResponse<?> callService() {

        RestResponse<String> response = new RestResponse<>();
        // We would only set data, message list would be populated in the aspect from logger call arguments
        response.setData(service.returnData());
        return response;
    }
}

Service

@Service
public class ServiceImpl {
    private static Logger log = Logger.getLogger(ServiceImpl.class.getName());

    public String returnData() {
        log.log(Level.WARNING, "Some warning message");
        return "Some data...";
    }
}

Aspect class that gets the message list from RestResponse and adds messages to it

@Aspect
public class RestAspect {

    @Pointcut("call(* java.util.logging.Logger.log(..)) " +
            "&& cflow(execution(* com.gg.spring.tests.services.ServiceImpl.returnData(..))) " +
            "&& cflow(execution(* com.gg.spring.tests.rest.TestRestController.callService(..)))" +
            "&& !within(RestAspect)")
    private void logPointcut() {
    }

    @Pointcut("call(com.gg.spring.tests.rest.RestResponse.new()) " +
            "&& cflow(execution(* com.gg.spring.tests.rest.TestRestController.callService(..)))" +
            "&& !within(RestAspect)")
    private void afterConstructingResponsePointcut() {
    }

    private List<String> messages;

    @Around(value = "logPointcut()")
    public void loggerCall(ProceedingJoinPoint joinPoint) {
        Object[] response = joinPoint.getArgs();
        synchronized (this.messages) {
            this.messages.add((String) response[1]);
        }
    }

    @AfterReturning(returning = "response", pointcut = "afterConstructingResponsePointcut()")
    public void firstCall(JoinPoint jp, RestResponse response) {
        synchronized (response.getMessages()) {
            this.messages = response.getMessages();
        }
    }
}

I seem to have got it working with this code, however, when I test with many threads I get some RestResponse objects with 0 messages, or some with 3 messages, but each one of them should have only 1 message since that's how many times I call log method in my service method.

With the test I wrote I create 128 threads, each of them calling the Rest layer method 1000 times and out of 128 000 calls 200 have a different number of messages than is supposed to be. Doesn't seem to be a high number but can be very significant in the right scenario.

Has anyone done something like this in the past and could share their experience? There are other ways to do this, but if this was working it would be a nice way to have less code. I would greatly appreciate your help.


Answer:

There are several ways to fix this. The simplest one is to just use a thread-local variable in your aspect. Then you do not need synchronized blocks anymore either. The latter do not solve your problem anyway because the problem is that multiple threads write into the same aspect member, the aspect being a singleton.

@Aspect
public class RestAspect {

  @Pointcut(
    "call(* java.util.logging.Logger.log(..)) "
    + "&& args(*, logMessage)"
    + "&& cflow(execution(* com.gg.spring.tests.services.ServiceImpl.returnData(..))) "
    + "&& cflow(execution(* com.gg.spring.tests.rest.TestRestController.callService(..)))"
    + "&& !within(RestAspect)")
  private void logPointcut(String logMessage) { }

  @Pointcut(
    "call(com.gg.spring.tests.rest.RestResponse.new()) "
    + "&& cflow(execution(* com.gg.spring.tests.rest.TestRestController.callService(..)))"
    + "&& !within(RestAspect)")
  private void afterConstructingResponsePointcut() {}

  private ThreadLocal<List<String>> messages = new ThreadLocal<>();

  @Around("logPointcut(logMessage)")
  public void loggerCall(ProceedingJoinPoint joinPoint, String logMessage) {
    this.messages.get().add(logMessage);
  }

  @AfterReturning(returning = "response", pointcut = "afterConstructingResponsePointcut()")
  public void firstCall(JoinPoint jp, RestResponse<String> response) {
    this.messages.set(response.getMessages());
  }
}

I wrote a test program like yours (128 threads, 1000 calls per thread) and could reproduce your problem as well as confirm that my solution works.

BTW, I also changed the aspect a little bit so as to use args() parameter binding rather than jp.getArgs().


Another way to solve this would be to use non-singleton instantiation for your aspect, such as percflow. But then you need to use native AspectJ syntax.