Hot questions for Using RabbitMQ in message listener

Top Java Programmings / RabbitMQ / message listener

Question:

Currently I'm building a custom library on top of the Spring AMQP project. I've come to the point where I want to implement a message listener to be able to receive message asynchronous. After reading further into the documentation as specified by the project, I management to find that it should be quite easy to implement your own message listener. Just implement the MessageListener class and configure it to fire on incoming messages.

So that is what I did:

public class ReceiveController implements MessageListener
{
    @Override
    public void onMessage(Message message)
    {
        System.out.println("Received a message!");
    }
}

Next I configured it like this:

private SimpleMessageListenerContainer configureMessageListener()
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connection);
    container.setQueueNames("test.queue");
    container.setMessageListener(this);
    return container;
}

Both pieces of code are located in the same class called 'ReceiveController'.

Hence the fact that I'm not using any context (annotations or xml). I'm not sure if this is mandatory for the project to function as I can just create instances of the classes myself.

When running some code which uses my library

  • The Consumer client connects to the RabbitMQ broker(and stays connected).
  • The Producer client connects to the RabbitMQ broker, sends his message and disconnects.
  • When I look at the queue using the management plugin I see that a message has been put on the queue, but the message listener did not get triggered.

For some reason the consumer does not receive any messages through it's listener. Could this be related to the fact that the queue was created using a 'amq.direct' exchange and 'test.route' routing key? Or is it something else?


Answer:

When constructing the container manually (outside of a Spring Application Context), you need to invoke afterPropertiesSet() and start().

Also, if your listener implements MessageListener or ChannelAwareMessageListener, you don't need an adapter.

Question:

I have a data store that is written to by multiple message listeners. Each of these message listeners can also be in the hundreds of individual threads.

The data store is a PriorityBlockingQueue as it needs to order the inserted objects by a timestamp. To make checking of the queue of items efficient rather than looping over the queue a concurrent hashmap is used as a form of index.

private Map<String, SLAData> SLADataIndex = new ConcurrentHashMap<String, SLAData>();;
private BlockingQueue<SLAData> SLADataQueue;

Question 1 is this a acceptable design or should I just use the single PriorityBlockingQueue.

Each message listener performs an operation, these listeners are scaled up to multiple threads.

Insert Method so it inserts into both.

this.SLADataIndex.put(dataToWrite.getMessageId(), dataToWrite);
this.SLADataQueue.add(dataToWrite);

Update Method

this.SLADataIndex.get(messageId).setNodeId(
            updatedNodeId);

Delete Method

SLATupleData data = this.SLADataIndex.get(messageId);
//remove is O(log n)
this.SLADataQueue.remove(data);
// remove from index
this.SLADataIndex.remove(messageId);

Question Two Using these methods is this the most efficient way? They have wrappers around them via another object for error handling.

Question Three Using a concurrent HashMap and BlockingQueue does this mean these operations are thread safe? I dont need to use a lock object?

Question Four When these methods are called by multiple threads and listeners without any sort of synchronized block, can they be called at the same time by different threads or listeners?


Answer:

Question 1 is this a acceptable design or should I just use the single PriorityBlockingQueue.

Certainly you should try to use a single Queue. Keeping the two collections in sync is going to require a lot more synchronization complexity and worry in your code.

Why do you need the Map? If it is just to call setNodeId(...) then I would have the processing thread do that itself when it pulls from the Queue.

// processing thread
while (!Thread.currentThread().isInterrupted()) {
   dataToWrite = queue.take();
   dataToWrite.setNodeId(myNodeId);
   // process data
   ...
}

Question Two Using these methods is this the most efficient way? They have wrappers around them via another object for error handling.

Sure, that seems fine but, again, you will need to do some synchronization locking otherwise you will suffer from race conditions keeping the 2 collections in sync.

Question Three Using a concurrent HashMap and BlockingQueue does this mean these operations are thread safe? I dont need to use a lock object?

Both of those classes (ConcurrentHashMap and the BlockingQueue implementations) are thread-safe, yes. BUT since there are two of them, you can have race conditions where one collection has been updated but the other one has not. Most likely, you will have to use a lock object to ensure that both collections are properly kept in sync.

Question Four When these methods are called by multiple threads and listeners without any sort of synchronized block, can they be called at the same time by different threads or listeners?

That's a tough question to answer without seeing the code in question. For example. someone might be calling Insert(...) and has added it to the Map but not the queue yet, when another thread else calls Delete(...) and the item would get found in the Map and removed but the queue.remove() would not find it in the queue since the Insert(...) has not finished in the other thread.

Question:

If using JSON Object to send and receive over RabbitMQ is this the cleanest way to send and receive? All that conversion seems inefficient.

Sending Code

JSONObject messageJSON = new JSONObject();
messageJSON.put("messageId", "testId");
messageJSON.put("NodeId", "testNode");

template.convertAndSend("TEST-EXCHANGE",
    "routing.test", messageJSON.toJSONString()
        .getBytes());

Receive Code

public class Listener implements MessageListener {

  @Override
  public void onMessage(Message message) {
    String recmessage = new String(message.getBody());
    JSONObject obj = (JSONObject) JSONValue.parse(recmessage);
    System.out
        .println("Message Received  " + (String) obj.get("messageId"));
  }
}

Solution From Answer given

You need to add the Dependency of Jackson. Below is maven:

<dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.4.1</version>
    </dependency> 

Add to the Spring Config

<bean id="amqpCorrectionTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="messageConverter">
        <bean
            class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
        </bean>
    </property>
</bean>

Add to the message Listener

Jackson2JsonMessageConverter jmc = new Jackson2JsonMessageConverter();
JSONObject obj  = (JSONObject) jmc.fromMessage(message);

Sending Code Only send the JSON Object passed in.

template.convertAndSend("TEST-EXCHANGE",
    "routing.test", messageJSON);

Answer:

This wrong question: the AMQP protocol (as any other wire-based) gets deal just with bytes, so it's up to your application how to convert to byte[] and back (if it is Java everywhere, of course).

From other side it would be better to your configuration based on the out-of-the- box converters. For example Jackson2JsonMessageConverter.

See more information in the Documentation.

Question:

I have created a message listener using spring AMQP which I am using to receive an Order POJO. The message type is application/json and so I have set up a jackson message convertor. Till now things are working fine and I am able to get the order POJO object recreated automatically in my listener. However I want to extend this example and want to check some message property in my listener. Thus instead of using my Order POJO in my handleMessage() I want to use "org.springframework.amqp.core.Message" as the parameter. I can then later convert the body, but in this way I will have all the message related properties in my listener which I can use in my application.

I tried using the handleMessage() with Message parameter but it seems that it tries to convert the message body as well using jackson convertors. I am not sure where to pass the Order POJO class jackson can use to convert my message body but should still be able to convert the Message correctly.

Please find below the important snippets from my code. Please help me as I think I have hit a roadblock on this.

POJO

public class Order {

private int orderid;
private String itemDescription;

TEMPLATE AND CONVERTER SETTINGS

@Bean
public RabbitTemplate rubeExchangeTemplate() {
    logger.info("Lets test autowiring " + rabbitConnectionFactory.getHost());
    RabbitTemplate r = new RabbitTemplate(this.rabbitConnectionFactory);
    r.setExchange("rmq-exchange");
    r.setMessageConverter(jsonMessageConverter());
    return r;
}

@Bean
public MessageConverter jsonMessageConverter()
{
    final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setClassMapper(classMapper());
    return converter;
}

@Bean
public DefaultClassMapper classMapper()
{
    DefaultClassMapper typeMapper = new DefaultClassMapper();
    typeMapper.setDefaultType(Order.class);
    return typeMapper;
}

MESSAGE WHICH I AM SENDING(messageText contains the JSON compliant with the Order POJO)

        Message message = MessageBuilder.withBody(messageText.getBytes())
            .setMessageId("123")
            .setContentType("application/json")
            .setHeader("bar", "baz")
            .build();

LISTENER

@Bean(value = "rube")
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(cf);
    container.setQueueNames("rmq-rube-queue");
    container.setMessageListener(messageListenerAdapter());
    return container;
}

@Bean
public MessageListenerAdapter messageListenerAdapter() {
    MessageListenerAdapter listener =  new MessageListenerAdapter(pm, converter);
    return listener;
}

Answer:

Don't use a MessageListenerAdapter in that case, simply implement MessageListener to get the raw message.

Alternatively, consider using the newer, annotation-based, POJO listener, where you can get access to headers as well as the converted payload...

@RabbitListener(queues = "foo")
public void listen(MyPojo pojo, @Header("foo") String fooHeader) { ... }

Question:

Hello I have RabbitMQ queue created from application A. I want to get messages from that queue so I have created message listener in spring boot application using this dependency:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

After that i make this scenario: 1. Start the spring boot app 2. Send a message to the queue

The problem is that the first two messages are not being listened from the listener. So that is why i create a message listener in Java only but I have got the same problem. In RabbitMQ management I can see that the message has been published and not consumed... I have tried many different configurations but non of that succeeded. When I create a queue manually everything its working correctly. So I have a doubt that the application A is not creating the queue correctly or with some arguments (even though in RabbitMQ manager the Args policy does not stay near the name of the queue)

The message properties from succesffully consumed message: These are the message properties of successfully consumed message:

MessageProperties [headers={}, correlationId= TODO_requestID, replyTo=ME, contentType=application/json, contentLength=0, redelivered=false, receivedExchange=me-exchange, receivedRoutingKey=from-me, deliveryTag=1, consumerTag=amq.ctag-vPDSoiCHmWcb0v0NrINbIg, consumerQueue=from-me])

Answer:

Go to the RabbitMQ Mannagment open the queue and check if there are other consumers. Maybe some other consumers are collecting the first two messages.

I have faced a similar problem..