Hot questions for Using RabbitMQ in axon

Question:

I am trying to send messages via rabbitmq to an axon4 spring boot based system. The message is received but no events are triggered. I am very sure I am missing an essential part, but up to now I wasn't able to figure it out.

Here the relevant part of my application.yml

axon:
    amqp:
        exchange: axon.fanout
        transaction-mode: publisher_ack
    # adding the following lines changed nothing
    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
spring:
    rabbitmq:
        username: rabbit
        password: rabbit

From the docs I found that I am supposed to create a SpringAMQPMessageSource bean:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class AxonConfig {

    @Bean
    SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "in.queue")
            @Override
            public void onMessage(final Message message, final Channel channel) {
                log.debug("received external message: {}, channel: {}", message, channel);
                super.onMessage(message, channel);
            }
        };
    }

}

If I send a message to the queue from the rabbitmq admin panel I see the log:

AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://rabbit@127.0.0.1:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://rabbit@127.0.0.1:5672/, localPort= 58614]

Here the Aggregate that should receive the events:

import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import static org.axonframework.modelling.command.AggregateLifecycle.apply;

@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @CommandHandler
    public OrderAggregate(final PlaceOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
    }

    @CommandHandler
    public void handle(final ConfirmOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderConfirmedEvent(orderId));
    }

    @CommandHandler
    public void handle(final ShipOrderCommand command) {
        log.debug("command: {}", command);
        if (!orderConfirmed) {
            throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
        }
        apply(new OrderShippedEvent(orderId));
    }

    @EventSourcingHandler
    public void on(final OrderPlacedEvent event) {
        log.debug("event: {}", event);
        this.orderId = event.getOrderId();
        orderConfirmed = false;
    }

    @EventSourcingHandler
    public void on(final OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    @EventSourcingHandler
    public void on(final OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    protected OrderAggregate() {
    }

}

So the problem is that the messages are received by the system but no events are triggered. The content of the messages seem to be irrelevant. Whatever I send to the queue I only get a log message from my onMessage method.

JavaDoc of SpringAMQPMessageSource says this:

/**
 * MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
 * <p>
 * The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
 * to all subscribed processors.
 * <p>
 * Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
 * be consumed from the AMQP Queue without any processor processing them.
 *
 * @author Allard Buijze
 * @since 3.0
 */

But up to now I couldn't find out where or how to register it.

The axon.eventhandling entries in my config and @ProcessingGroup("amqpEvents") in my Aggregate are already from testing. But having those entries in or not made no difference at all. Also tried without the mode=subscribing.

Exact versions: Spring Boot 2.1.4, Axon 4.1.1, axon-amqp-spring-boot-autoconfigure 4.1

Any help or hints highly appreciated.


Update 23.04.19:

I tried to write my own class like this:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {

    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
    private final AMQPMessageConverter messageConverter;

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
        eventProcessors.add(messageProcessor);
        log.debug("subscribe to: {}", messageProcessor);
        return () -> eventProcessors.remove(messageProcessor);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received external message: {}, channel: {}", message, channel);
        log.debug("eventProcessors: {}", eventProcessors);
        if (!eventProcessors.isEmpty()) {
            messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
                            .ifPresent(event -> eventProcessors.forEach(
                                ep -> ep.accept(Collections.singletonList(event))
                            ));
        }
    }

}

The result is the same and the log now proofs that the eventProcessors are just empty.

eventProcessors: []
So the question is, how to register the event processors correctly. Is there a way how to do that properly with spring?

Update2:

Also no luck with this:

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {

        try {
            final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
            eventProcessorsField.setAccessible(true);
            final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
            log.debug("eventProcessors: {}", eventProcessors);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING

Registering it programmatically in addition to above also didn't help:

    @Autowired
    void configure(EventProcessingModule epm,
                   RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
        epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
        epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
    }

Of course @ProcessingGroup("amqpEvents") is in place in my class that contains the @EventSourcingHandler annotated methods.


Update 25.4.19:

see accepted answer from Allard. Thanks a lot pointing me at the mistake I made: I missed that EventSourcingHandler don't receive messages from outside. This is for projections. Not for distributing Aggregates! ups Here the config/classes that are receiving events from rabbitmq now:

axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {

    private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();

    @EventHandler
    public void on(OrderPlacedEvent event) {
        log.debug("event: {}", event);
        String orderId = event.getOrderId();
        orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
    }

    @EventHandler
    public void on(OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderConfirmed();
            return orderedProduct;
        });
    }

    @EventHandler
    public void on(OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderShipped();
            return orderedProduct;
        });
    }

    @QueryHandler
    public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
        log.debug("query: {}", query);
        return new ArrayList<>(orderedProducts.values());
    }

}

I removed the @ProcessingGroup from my Aggregate of course.

My logs:

RabbitMQSpringAMQPMessageSource : received message: ... 
OrderedProductsEventHandler : event: OrderShippedEvent...

Answer:

In Axon, Aggregates do not receive events from "outside". The Event Handlers inside Aggregates (more specifically, they are EventSourcingHandlers) only handle events that have been published by that same aggregate instance, so that it can reconstruct its prior state.

It is only external event handlers, for example the ones that update projections, that will receive events from external sources.

For that to work, your application.yml should mention the bean name as a processors' source instead of the queue name. So in your first example:

    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing

Should become:

    eventhandling:
        processors:
            amqpEvents:
                source: inputMessageSource
                mode: subscribing

But again, this only works for event handlers defined on components, not on Aggregates.

Question:

I am new to axon and doing migration from Axon 2.4.3 to 3.1.1 but I am not able to find any migration guide which is available for other version? Can you please share your experience on how to do the same. I am facing a lot problem, some classes have been removed, some packages have been changed. For some classes I am even not able to find replacements, so please help me with some suggestion. If there is a guide for same please provide me with link of that.

Thanks in Advance

Acctually I am not able to find replacement for these which were there in axon 2.4.3 ClusteringEventBus- DefaultClusterSelector- EventBusTerminal- SimpleCluster- SpringAMQPTerminal- SpringAMQPConsumerConfiguration- ListenerContainerLifecycleManager-


Answer:

There currently is not an official Axon 2.x to 3.x migration guide, although it is on the backlog to be introduced. I can, however, give you a couple of pointers you should look for whilst migrating:

  1. AbstractAnnotatedAggregateRoot is no longer necessary for your Aggregates, so remove it.
  2. Publishing events in your aggregates is now done with the static AggregateLifecycle.apply() function, so import it.
  3. AbstractAnnotatedSaga is no longer necessary for your Sagas, so remove it.
  4. If in an Spring application, it is suggested to use the @Aggregate annotation on your aggregates to notify Spring to create all the required beans (repository, factory,,,) for an Aggregate.
  5. If in an Spring application, it is suggested to use the @Saga annotation on your sagas to notify Spring to create all the required beans (repository, manager,,,) for a Saga.
  6. The domain_event_entry table has an added globalIndex column, which if you already have quite some events needs correct migration. This post gives some insights how an Axon Framework user is solving this.
  7. In Axon 2.x you had the notion of Clusters, were you could group your Event Handling Components in. This has been replacing by Event Processing Groups, where you've got the possibility to choose between a SubscribingEventProcessor (push events to the Event Handling Components) and a TrackingEventProcessor (pull events being stored and handle them in your Event Handling Components).
  8. In a Spring / Axon 2.x mix, you've possibly used configuration through Spring XML. In 3.x you can use (1) the Configurer API, (2) use the @EnableAxon annotation on a Spring Configuration class or (3 - recommended) use the axon-spring-boot-starter dependency to automatically get all your Axon beans.

This is what I can think of on the top of my mind, but I'm probably forgetting some pointers. You can also find some info on migration in this Axon User Group post, or more generally the Axon User Group might have some things you're looking for.

By the way, feel free to update your question, then I can update my answer to fill in the blanks you're still missing!

Update

This bit is regarding the specific classes you're missing when updating from 2.4.3 to 3.1.1:

Like I've shared in my earlier response, bullet point 7 to be exact, the complete Cluster approach in Axon 2.x has been replaced for the Event Processor approach in Axon 3.x. Conceptually not much has changed here, internally it however behaves differently and intendedly more concise. So the short answer is, all those classes have been replace by Event Processor, for which the documentation is here.

As that's not very helpful at all, I'll give you a specific answer to the classes you're missing to help you out. It's quite long, so be prepared:

  • ClusteringEventBus: This was in place to publish events to a Cluster of Event Handling Components. In Axon 3.x, that's a now behind a Processing Group, handled by either a Subscribing or Tracking implementation. Hence, do not search for the ClusteringEventBus to publish events to groups. All the 3.x EventBus implementations will know how to publish events to a SubscribingEventProcessor, whilst the TrackingEventProcessor will pull the events from the store itself (so no bus involved).
  • DefaultClusterSelector: The Cluster selector was in charge of grouping Event Handling Components / Event Listeners into a cluster. As shared, we no longer regard a set of Event Listeners as a cluster, but as a Processing Group. The behavior in 3.x is such that the package name of your Event Listeners implementation is the name of the Processing Group used. You can however overwrite this, but adding the @ProcessingGroup({processing-group-name}) as a class level annotation to your Event Listener implementation. The Axon 3.x configuration will automatically group the Event Listeners with the same Processing Group Name under the same Event Processor (which in 2.4.x would translate to the same Cluster). By default, the Event Processor implementation used will be Subscribing. This can be adjusted to Tracking in the configuration.
  • SimpleCluster: As follows from my earlier explanation, there no longer is a SimpleCluster. This has been replaced by the EventProcessor interface, which is implemented by the Subscribing and Tracking Event Processors.
  • EventBusTerminal: The EventBusTerminal was in charge of publishing the events to the right Clusters, albeit local or remote. As shared, we no longer have Cluster, but Event Processor groups. How events get to a Event Processor depends on the implementation used. If Subscribing (read: the default Event Processor) is used, the EventBus is in charge of publishing the events to them. The TrackingEventProcessor will however, asynchronously, start it's own thread(s) to pull events from the EventStore and in place send those events to its Event Listeners. Thus, you no longer have to search for the EventBusTerminal, as it's obsolete.
  • SpringAMQPTerminal: As I've shared above, the EventBusTerminal has been removed in favor of the Subscribing or Tracking approach. As of Axon 3.1.1, for Spring AMQP we've got a Subscribing Event Processor implementation to listen to events and publish them on a Queue, which is the SpringAMQPPublisher.
  • SpringAMQPConsumerConfiguration: This configuration class was in place, because when axon-amqp was introduced, Spring did not create the ListenerContainers as it does at introduction point of Axon 3.x. Hence there was decided against keeping our own configuration set up for this consumers and leave that to the competent hands of Spring. Hence you will not find SpringAMQPConsumerConfiguration and should search how Spring creates the consumers for AMQP.
  • ListenerContainerLifecycleManager: This class was the implementation to correctly receive all event incoming from queues and send them over to all your Event Listeners. This class has been replaced by the SpringAMQPMessageSource.

Hope this gives you the answers you're looking for @AS!

Question:

So im working on my first Axon application. So far I got the entire axon project working in the monolith. After that i started to divide the application into pieces using the @Profile from Sprint Boot. Also worked like a charm. So my next step is to go fully distributed and use 2 entirely different intellij projects for my Axon flow. And here i bump into a problem. I use AMQP for the messaging of the events. With the information in the docs i can get a event from one service to the other. But the eventhandler is not triggered. I guess the problems is because of the type of the Event. The first services sends the event like the following: com.departureExample.Departure.coreapi.events.CheckIn.CheckedInEvent. But my eventhandler in my other project listens to: com.checkInService.CheckIn.Service.coreapi.events.CheckIn.CheckedInEvent. My guess is this is why the eventhandler is not triggered, but im not sure?

All the other configuration for amqp is set correctly, i also connect the ampqmessagesource to my processingroup.

So my question is, is it possible the eventhandler is not triggered because the checkedInEvent got 2 different paths? And if so, how could i fix this?


Answer:

Your assumption is indeed correct. The payloadType for your EventHandler does not match with the event being emitted. Your EventHandler method registers that it can handle an event of type com.departureExample.Departure.coreapi.events.CheckIn.CheckedInEvent. However, the event being sent over rabbit is of type com.departureExample.Departure.coreapi.events.CheckIn.CheckedInEvent, for which there is no handler registered.

For any EventHandler it is required that the (super)classes match with what is being emitted, which includes the package name. Some more detailed explanation can be found in the documentation (https://docs.axoniq.io/reference-guide/implementing-domain-logic/event-handling/handling-events):

In all circumstances, at most one event handler method is invoked per listener instance. Axon will search for the most specific method to invoke, using following rules:

  1. On the actual instance level of the class hierarchy (as returned by this.getClass()), all annotated methods are evaluated

  2. If one or more methods are found of which all parameters can be resolved to a value, the method with the most specific type is chosen and invoked

  3. If no methods are found on this level of the class hierarchy, the super type is evaluated the same way

  4. When the top level of the hierarchy is reached, and no suitable event handler is found, the event is ignored.

My suggestion would be to use the fully qualified name of the service emitting the event, which in your case is com.departureExample.Departure.coreapi.events.CheckIn.CheckedInEvent. Even if the service you are handling the event in is in a different Bounded Context, you are still handling an event that belongs to the Bounded Context from which it was emitted. Having your package structure reflect makes your code more readable.

Question:

I'm trying to create a simple microservices project to learn working with the Axon Framework.

I've set up messaging through RabbitMQ with the following code:

@Bean
public Exchange exchange() {
    return ExchangeBuilder.fanoutExchange("Exchange").build();
}

@Bean
public Queue queue() {
    return QueueBuilder.durable("QueueA").build();
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}

@Autowired
public void configure(AmqpAdmin admin) {
    admin.declareExchange(exchange());
    admin.declareQueue(queue());
    admin.declareBinding(binding());
}

And the folling in my application.properties:

axon.amqp.exchange=Exchange

With this configuration all events published through the Axon Framework will be sent to QueueA. But now I want to make all EventA events go to QueueA and all EventB events go to QueueB. How can I do that?


Answer:

By default, Axon Framework uses the package name of the event as the AMQP Routing Key. This means you can bind queues to topic exchanges using patterns to match against these routing keys. See https://www.rabbitmq.com/tutorials/tutorial-five-java.html for more information.

You can customize Axon's behavior, by providing a custom RoutingKeyResolver (a simple Function that returns a String for a given EventMessage). This is then configured in the AMQPMessageConverter, which is responsible for creating an AMQP Message based on an Axon EventMessage (and vice versa). You can use the DefaultAMQPMessageConverter if you're fine with the default AMQP Message format.

Question:

I have developed Spring Boot + Axon (Bootiful CQRS with Axon) code by taking reference from https://www.youtube.com/watch?v=Jp-rW-XOYzA&list=PLgGXSWYM2FpOa_FTla-x5Wd10dpmgrRC4&index=54

I've placed all by code here: https://github.com/JavaHelper/axon-tutorials/tree/master/axon-complaints-demo. The issue which I am facing is very weird issue.

Scenario-1:

If I start both the applications (one on 8080 and other on 8081) and hit the below end-points, then nothing is appearing in the RabbitMQ queue and thus, nothing is coming when I hit the http://localhost:8081/ . It's blank.

CURL commands

curl -H "Content-Type:application/json" -d '{"company" : "apple", "description" : "My Kep"}' http://localhost:8080

curl -H "Content-Type:application/json" -d '{"company" : "apple", "description" : "Laptop not working"}' http://localhost:8080

curl -H "Content-Type:application/json" -d '{"company" : "Mastercrd", "description" : "Debit and Clearning does not works"}' http://localhost:8080 

Scenario-2:

If I start the demo-complaints first and then hit above curl command then RabbitMQ shows the queues count 3. After that I start the demo-complaints-stats successfully, then nothing shows up on the hit of http://localhost:8081/

Could anyone please guide what's wrong ?

Source code link already provided.


Answer:

The demo you're linking to is already relatively old. Or differently put, things might have changed which aren't clear from that start. Have you check out the manual over here and followed the necessary steps?

My hunch is that you haven't configured your AMQP Message Source to be the source of events for your Event Handler. To do that, you need to specify a Processing Group for your Event Handler and tie the AMQP Message Source to that Processing Group.