Hot questions for Using RabbitMQ in consumer

Question:

I've created using the RabbitMQ web-UI a topic exchange TX and bind to the exchange two queues TX.Q1 and TX.Q2, each binded with routing-keys rk1 and rk2 accordingly, and produced few messages to the exchange.

Now I want to create a consumer using Spring Cloud Stream that will take messages from Q1 only. I tried using configuration:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

and the annotation @StreamListner(Sink.INPUT) for the method that consumes messages.

As result I can see that the consumer has created a queue (or binding) with the same name TX.Q1 but the Routing-Key of the new queue/bind is #. How can I configure via Spring Cloud Stream a consumer that will consume messages from the predifined queue (only that routed with rk1).


Answer:

So for now, the work-around that Garry Russell suggested has solved the issue for me.

I've used @RabbitListener instead of @StreamListenet this way: @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1").

As a result, the predefined queue TX.Q1 is bind with binding key : rk1 to the exchange TX.

Waiting for updates on the Spring Cloud Steream issue.

Question:

Is it legal and safe to send a message to another queue from a consumer of current queue?

public void onMessage(){
    //save to db
    Order o=myservice.create(order);

    Object o=rabbitTemplate.convertSendAndReceive(queue2,orderId);   
}

I think in this case consumer of the second queue may not see the saved order because the transaction will be committed only after onMessage method exits.

And is it safe and legal to send messages from consumers?


Answer:

Yes, it's perfectly legal and safe, looking from the rabbitmq or amqp side.

But is it safe for the consistency of the data that your business application is handling, that's another story. If consumer A receives message M, extracts some data from it to S for DB writing (for example) and in the same time forwards message M to consumer B that upon receiving needs to write data to DB, but this data is depending on S, then what will happen? Well, chances are random and this is known as race condition - in this case, A is racing the broker - it needs to write S to DB before B receives the message M. Best way to avoid the race is, well, to avoid the race. Have A forward the message only after it has finished "setting up the play" for the other consumers.

Question:

Let me first make sure I explain the problem:

I have to process multiple queues that are filled, in the order they were actually created. This means that if the queues contain:

q1: m1, m2, m5, m7

q2: m3, m6, m9

q3: m4, m8

I would like to process them so that m4 does not process before m1, m2, or m3. m3 can execute anytime (it does not have to wait for m1 and m2, but it is ok for it to wait since it will most likely be simpler/safer to implement). and m8 does not process until m7 is processed.

I know it would serialize the effort - but using multiple threads, and I already have locking on some other value in the payload helps ensure that they won't step on each other and gain some amount of parallel processing.

We just ran into issues that q3 processed before the records in q1 and q2, so it couldn't actually do what it was supposed to do. q1 and q2 do take longer to process, and we expect to have more records put into those queues as well.

I have requested that the sender change to a single queue, but I'm not sure they will be making this change (different team, different priorities), so I am trying to have a realistic backup plan.

Now here is my actual question: I've seen that you can have 1 listener for multiple queues - is there any documentation on the order in which I would receive the messages? Is it just a round robin, always taking the oldest record from each queue? Or is it always the oldest record from all queues it is listening to that is delivered to my listener?


Answer:

It depends on the prefetch, by default, the prefetch is 1, which means the broker will deliver 1 message and wait for an ack. The prefetch applies to the channel (across all the queues).

If the container concurrentConsumers is 1 (default), they will be processed serially, but the order is indeterminate - it depends on how the broker delivers them. I don't know the internal algorithm used by rabbitmq when a single channel has consumers on multiple queues; it's best to assume it's indeterminate.

I have requested that the sender change to a single queue,

A producer publishes to an exchange with a routing key - he shouldn't care about the queue topology downstream. The consumer decides the queue topology by binding to that exchange - if you change the exchange to a fanout you can bind a single queue to it and you'll get messages in order, regardless of the routing key the producer uses.

If the producer "owns" the exchange and won't change it, you can bind a fanout exchange to his exchange and bind your single queue to that.

Of course, if he adds queues to his exchanges, messages will accumulate there.

But, as I said, producers need not be involved in the queue topology.

Question:

I have a service that sends message to rabbitmq and the consumer do some manipulation of the message and re-queue them.

I can successfully send to rabbitmq the initial message but the problem is i cannot resend to rabbitmq any consumed message if the message requires modifications.

@Service
public class MyService {

    /**
     * The template
     */
    @Autowired
    private AmqpTemplate amqpTemplate;
    private final RabbitMQConfig config;

    public void send(String message) {
        try {
            amqpTemplate.convertAndSend("ex", "r", message);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Then in my config i have setup: @Bean public ConnectionFactory connectionFactory() { /* working code */ }

@Bean
public Queue myQueue() { return new Queue("my-queue"); 
// etc...

@Bean
MessageListenerAdapter myListenerAdapter(MyListener listener) {
    return new MessageListenerAdapter(listener, "listener");
}

@Bean
MyListener myListener() {
    return new MyListener();
}

then...

public class MyListener {
    public void receiveMessage(String message) { 
        // ... some code
        // if message requires modification, then repush
        new Repush().push(message);
    }
}

I tried to create a new class with new but the "myService" always null

@Component
public class Repush {
    @Autowired
    private MyService myService;

    public void push(String message) {
        // myService is null at this point
    }
}

Answer:

Don't use new for bean creation. Spring injects fields only in beans. Your MyListener is a bean. Just add Repush field with @Autowired annotation in this class.

public class MyListener {
    @Autowired
    private Repush repush;

    public void receiveMessage(String message) { 
        // ... some code
        // if message requires modification, then repush
        repush.push(message);
    }
}

Question:

I am reading the documentation about Channel.basicCancel operation in rabbitmq https://www.rabbitmq.com/consumer-cancel.html . The docs says that one of possible cancellation case is when consumer sends cancel signal on the same channel on which it is listening.

Is this the only possibility? Can you cancel remote consumer running on different channel/connection/process?

I am trying to send the cancel request from another another process. When I do it ends with an exception java.io.IOException: Unknown consumerTag just like such operation was restricted to cancelling local consumers (on own channel or connection).

UPDATE:

I noticed that this "Unknown consumerTag" exception is a result of initial validation inside com.rabbitmq.client.impl.ChannelN.basicCancel(String):

 Consumer originalConsumer = (Consumer)this._consumers.get(consumerTag);
 if (originalConsumer == null) {
   throw new IOException("Unknown consumerTag");
 }
 ...

But still there might be some rpc call which does the trick...


Answer:

The RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.


The documentation is correct, you must cancel a consumer from its own channel/connection.

Other options include making your consumers aware of "cancellation messages" that will cause them to stop themselves, or using the API to close an entire connection, which will close all channels associated with it.

Question:

Here is a situation where in a message refuses to deliver itself to the consumer due to improper handling by the client and therefore this message bounces infinitely between the server and the client resulting into continuous stream of log messages which fills up the disk space.

How to avoid this situation? Or in other words how to restrict the retry to a limited number of times?

I tried retry template with rabbit template but with no success. Please find the configuration below:

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" reply-timeout="10" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
    <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
        <property name="initialInterval" value="500" />
        <property name="multiplier" value="10.0" />
        <property name="maxInterval" value="10000" />
    </bean>
</property>
</bean>

I referred this article for my problem: Handling AMQP messages gracefully with spring-amqp


Answer:

Using a retry template with a rabbit template is for retrying publishing messages and has nothing to do with consuming (unless you are using template.receive().

To add retry and abort to the listener container, see the documentation. You need to add a retry interceptor to the listener container's advice chain; if your messages don't have a message id, you have to use stateless retry because the framework needs to know how many times a message has been retried and amqp doesn't provide that information.

See the discussion about what to do when the retries are exhausted. By default the message is just logged and dropped but you can configure an appropriate recoverer. An RejectAndDontRequeueRecoverer will reject it and rabbit can be configured to send the message to a dead letter exchange/queue. Or you can use a RepublishMessageRecoverer to write the message to another queue, including information about the failure.

See this question/answer for an example. But note that using stateful recovery with a random message ID won't work.

Question:

I want to a queue to be consumed by only one subscriber at a time. So if one subscriber drops, then another one(s) will have the chance of subscribing.

I am looking for the correct way of doing it in Spring AMQP. I did this in pure Java, based on the example in RabbitMQ's website. I passively declare the queue, check its consumer count, if it is 0, then start to consume it.

Here's the code.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

int count = channel.queueDeclarePassive(QUEUE_NAME).getConsumerCount();

System.out.println("count is "+count);
if (count == 0) {
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} else{
    System.out.println("subscribed by some other processor(s)");
}

I also can check the subscriber count in Spring AMQP this way. But it is too late, because it already listens to the queue.

@RabbitListener(queues = "q1")
public void receivedMessageQ1(String message, Channel channel){
    try {
        int q1 = channel.queueDeclarePassive("q1").getConsumerCount();
        // do something.
    } catch (IOException e) {
        System.out.println("exception occurred");
    }
}

In a nutshell, I want to consume a queue based on its consumer count. I hope I am clear.


Answer:

Set the exclusive flag on the @RabbitListener; RabbitMQ will only allow one instance to consume. The other instance(s) will attempt to listen every 5 seconds (by default). To increase the interval, set the container factory's recoveryBackOff.

@SpringBootApplication
public class So56319999Application {

    public static void main(String[] args) {
        SpringApplication.run(So56319999Application.class, args);
    }

    @RabbitListener(queues = "so56319999", exclusive = true)
    public void listen (String in) {

    }

    @Bean
    public Queue queue() {
        return new Queue("so56319999");
    }

}

Question:

I'm implementing a daily job which get data from a MongoDB (around 300K documents) and for each of them publish a message on a RabbitMQ queue. On the other side I have some consumers on the same queue, which ideally should work in parallel.

Everything is working but not as much as I would, specially regarding consumers performances.

This is how I declare the queue:

rabbitMQ.getChannel().queueDeclare(QUEUE_NAME, true, false, false, null);

This is how the publishing is done:

rabbitMQ.getChannel().basicPublish("", QUEUE_NAME, null, body.getBytes());

So the channel used to declare the queue is used to publish all the messages.

And this is how the consumers are instantiated in a for loop (10 in total, but it can be any number):

Channel channel = rabbitMQ.getConnection().createChannel();
MyConsumer consumer = new MyConsumer(customMapper, channel, subscriptionUpdater);
channel.basicQos(1);    // also tried with 0, 10, 100, ...
channel.basicConsume(QUEUE_NAME, false, consumer);

So for each consumer I create a new channel and this is confirmed by logs:

...
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@bdd2027
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@5d1b9c3d
com.rabbitmq.client.impl.recovery.AutorecoveringChannel@49a26d19
...

As far as I've understood from my very short RabbitMQ experience, this should guarantee that all the consumer are called. By the way, consumers need between 0.5 to 1.2 seconds to complete their task. I have just spotted very few 3 seconds.

I have two separate queues and I repeat what I said above two times (using the same RabbitMQ connection).

So, I have tested publishing 100 messages for each queue. Both of them have 10 consumers with qos=1.

I didn't expect to have exactly a delivery/consume performance of 10/s, instead I noticed:

  • actual values are around 0.4 and 1.0.
  • at least all the consumers bound to the queue have received a message, but it doesn't look like "fair dispatching".
  • it took about 3 mins 30 secs to consume all the messages on both queues.

Am I missing the main concept of threading within RabbitMQ? Or any specific configuration which might be still at default value? I'm on it since very few days so this might be possible.

Please notice that I'm in the fortunate position where I can control both publishing and consuming parts :)

I'm using RabbitMQ 3.7.3 locally, so it cannot be any network latency issue.

Thanks for your help!


Answer:

The setup of RabbitMQ channels and consumers were correct in the end: so one channel for each consumer.

The problem was having the consumers calling a synchronized method to find and update a MongoDB document.

This was delaying the execution time of some consumers: even worst, the more consumers I was adding (thinking to speed up processing), the less message rate/s I was getting.

I have moved the MongoDB part on he publishing side where I don't have to care about synchronization because it's done in sequence by one publisher only. I have a slightly decreased delivery rate/s but now with just 5 consumers I easily reach an ack rate of 50-60/s.

Lessons learnt:

  • create a separate channel for the publisher.
  • create a separate channel for each consumer.
  • let RabbitMQ manage threading for the consumers (--> you can instantiate them on the main thread).
  • (if possible) back off publishing to give the queues 100% time to deal with consumers.
  • set a qos > 1 for each consumer channel. But this really depends on your scenario and architecture: you must do some performance test.

As a general rule:

  • (1) calculate/estimate delivery time.
  • (2) calculate/estimate ack time.
  • (3) calculate/estimate consumer time.
  • qos = (1) + (2) + (3) / (3)

This will give you an initial qos value to test and tweak based on your scenario. The final goal is to have 100% utilization for all the available consumers.

Question:

I am using spring amqp rabbit @RabbitListener annotation from : artifact spring-rabbit-1.7.1.RELEASE I wonder if there is a way to configure for each queue the number of consumers ? I have been digging in the documentation and found nothing yet , is there a way to configure in the related container for each queue the number of consumers ? Thanks in advance.


Answer:

Configure the concurrency via the container factory bean as shown in the documentation.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

If you are using Spring Boot, which creates the factory bean for you, you can configure them using properties.

If you want a fixed number of consumers, just omit the max.

If you want different settings for each listener, you need a different factory for each set of settings. You would then reference the particular container factory for a @RabbitListener in its containerFactory property.

Question:

RabbitMQ Java client's Consumer interface doesn't seem to have anything such as a handleException() method.

So what are the consequences of a RuntimeException being thrown inside Consumer.handleDelivery()?

One could expect the exception to be somehow logged and the consumer to keep working for future deliveries, but I'm not sure.


Answer:

If you don't handle the exception your channel will be closed.

With autoAck = false the messages will be re-queued.

It is always a good practice to handle error during the consuming.

BTW it exists an Exception handler you can use:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
final ExceptionHandler eh = new DefaultExceptionHandler() {
    @Override
    public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
        System.out.println(" - Error raised by: " + channel.getChannelNumber());
    }
};
factory.setExceptionHandler(eh);

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();


channel.queueDeclare("my_queue",true,false,false,null);
channel.basicConsume("my_queue", true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        System.out.println("Received...");
        System.out.println("error:"+ Integer.parseInt("RAISE_AN_ERROR"));

The errors will be redirect to the handler and the channel won't be closed.

for my side you should always handle errors inside the event.

Question:

I'm using rabbitMQ in order to send tasks to workers (consumers) which are created on the run. Currently, each time a new task is created, a new worker is created. The problem goes like that : -A user creates a task

-A worker is created then the task is sent on the queue for the workers to process

-The worker starts processing the queue (the worker basically sleeps for a time)

-Another user creates a task

-New worker is created and task sent on the queue

-The new worker doesn't process the new task and does absolutly nothing meanwhile, and the new task is processed by the first worker once he's done with the first task

I've checked on the admin part of rabbitmq and there are two consumers bound to the queue, but one of them seems to do all the work while the other just waits.

Here's the code for the worker: public class Worker extends Thread {

private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;

public Worker() {
    Worker.id++;
    compteur = id;
}
public void run() {
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("WORKER " + compteur + " : Message received :" + message);

                String taskName = message.split(" ")[0];
                String length = message.split(" ")[1];

                try {
                    System.out.println("WORKER " + compteur + " : Commencing job :" + message);
                    doWork(length);
                    System.out.println("WORKER " + compteur + " : Job's finished :" + message);
                    taskName += " done by " + compteur;
                   // confirm(taskName);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
                    System.out.println("WORKER " + compteur + " : Waiting for a new task...");
                }

            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);
    } catch (IOException ex) {
            Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
    } catch (TimeoutException ex) {
            Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
    }
}

private static void doWork(String taskLength) throws InterruptedException {
    int temps = Integer.parseInt(taskLength);
    Thread.sleep(temps);
}
}

and the code for the part which puts the messages into the queue: public class serveurSD {

private final static String QUEUE_NAME = "Tasks";
private  Manager MANAGER = new Manager();

@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
    if (MANAGER == null){
        MANAGER= new Manager();
        MANAGER.listen();
    }
    System.out.println("SERVER : Message received : " + txt + " " + duree);
    MANAGER.giveTask();
    ConnectionFactory factory = new ConnectionFactory();
    String message = txt + " " + duree;
    System.out.println("SERVER : Sending message to workers : " + message);
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    channel.close();
    connection.close();       
    return "Your task is being processed";
}
}

(Manager is the class creating the workers on the go.)

I'm sorry if a similar question has already been asked but I couldn't find it. Thanks for any possible help :)


Answer:

the second parameter of the basicConsume method is "auto acknowledge". Having this parameter set to true means the consumer will tell RabbitMQ that the message has been acknowledged, as soon as it receives the message.

When the consumer is set to autoAck true, it is highly likely that it will immediately receive the next available message from the queue, even when basicQos is set to 1. this happens, because the 1 limit is immediately decremented by the consumer, to say it no longer has any message and it can accept the next one.

Changing the auto ack parameter to false prevents this problem, when combined with the basic QoS setting of 1, because it forces your consumer to say "hey, i've got a message and i'm currently working on it. don't send me anything else until i'm done."

this allows the other consumer to say "hey, i have a spot open. go ahead and send me the message"

Question:

I have a problem with RabbitMQ consumer. Actually i have a single consumer geting messages from three queues. The problem is that i need to get a multiple messages from each of them, but my consumer gets only one per queue and ends getting. I would be grateful if someone could help me solve this problem.

Consumer code below

        for (int i = 0; i < queueNames.size(); i++) {

        Channel channel = connection.createChannel();
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer);

        flag = true;
        while (flag) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(routingKey);
            String message = new String(delivery.getBody(), "UTF-8");

                flag = false;
        }
    }

where queueNames is a list containing names of my queues (in number of 3).


Answer:

You need to subscribe to the queue, a consumer will only consume 1 message the way you defined it

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
 new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
         String routingKey = envelope.getRoutingKey();
         String contentType = properties.getContentType();
         long deliveryTag = envelope.getDeliveryTag();
         // (process the message components here ...)
         channel.basicAck(deliveryTag, false);
     }
 });

More info here: https://www.rabbitmq.com/api-guide.html

Question:

Is there any way to find out the subscriber count of an already declared queue in Spring AMQP? I found a com.rabbitmq.client.Channel class using which I am able to do this:

int consumerCount = channel.queueDeclare().getConsumerCount();

However, this declares a new queue, with a random name, and since it has no consumer, it returns 0.

Is there any way to do it for an already declared queue?


Answer:

You can use passive declaration.

A passive declare simply checks that the entity with the provided name exists. If it does, the operation is a no-op. For queues successful passive declares will return the same information as non-passive ones, namely the number of consumers and messages in ready state in the queue.

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.getMessageCount();
// returns the number of consumers the queue has
response.getConsumerCount();

Question:

I'm working in designing a solution for messaging where there are at least two servers consuming messages from the same queue, but it should work for N servers.

The workflow for the simplest solution I've been working is the following:

Server A: Process message -> Publish to exchange
Server B: Consume message -> Process message -> Publish response to return exchange
Server A: Consume message -> Process message and finish

But what I'm trying to do is doing the same but with two "servers A". Problem is, I need to make this to work in synchronous mode, so that the servers doesn't keep listening forever, but on demand.

It would be something like this:

Server A's Load balancer: "I'll send this message to server A1 or A2"
Server A1: Read request -> Process message -> Publish to exchange
Server A2: It's not doing anything because there are no requests.
Server B: Consume message -> Process message -> Publish response to return exchange
Server A1: Consume message -> Process message and finish
Server A2: Hasn't done anything because there were no requests.

If a message goes from Server A1, it must return to that server, but i'm having trouble because RabbitMQ doesn't know where to send the response because of the consumer's non-exclusive mode. Also, I think I saw that keeping the connection alive makes the RabbitMQ server balance between the opened connections, so if I had 5 servers A, it would only success in a 20% chance, the other 80% would be lost messages.

I found a solution which is: publish to single queue, consume from different queues (A1->Q1, A2->Q2...) but I'm not sure if that's the right way to do it.

Maybe there is an alternative option that I'm not aware, but I must use RabbitMQ.


Answer:

This sounds like a sort of RPC work. Each server "A" can create a temporary-private queue and specify it as the "reply-to" property of the message. When server "B" reply, it will reply directly publishing the message in the "reply-to" queue, private of the "A" server that originally delivered the request.

This is well depicted here.

Question:

I have below configuration for rabbitmq

prefetchCount:1 ack-mode:auto.

I have one exchange and one queue is attached to that exchange and one consumer is attached to that queue. As per my understanding below steps will be happening if queue has multiple messages.

  1. Queue write data on a channel.
  2. As ack-mode is auto,as soon as queue writes message on channel,message is removed from queue.
  3. Message comes to consumer,consumer start performing on that data.
  4. As Queue has got acknowledgement for previous message.Queue writes next data on Channel.

Now,my doubt is,Suppose consumer is not finished with previous data yet.What will happen with that next data queue has written in channel?

Also,suppose prefetchCount is 10 and I have just once consumer attached to queue,where these 10 messages will reside?


Answer:

The scenario you have described is one that is mentioned in the documentation for RabbitMQ, and elaborated in this blog post. Specifically, if you set a sufficiently large prefetch count, and have a relatively small publish rate, your RabbitMQ server turns into a fancy network switch. When acknowledgement mode is set to automatic, prefetch limiting is effectively disabled, as there are never unacknowledged messages. With automatic acknowledgement, the message is acknowledged as soon as it is delivered. This is the same as having an arbitrarily large prefetch count.

With prefetch >1, the messages are stored within a buffer in the client library. The exact data structure will depend upon the client library used, but to my knowledge, all implementations store the messages in RAM. Further, with automatic acknowledgements, you have no way of knowing when a specific consumer actually read and processed a message.

So, there are a few takeaways here:

  1. Prefetch limit is irrelevant with automatic acknowledgements, as there are never any unacknowledged messages, thus
  2. Automatic acknowledgements don't make much sense when using a consumer
  3. Sufficiently-large prefetch when auto-ack is off, or any use of autoack = on will result in the message broker not doing any queuing, and instead doing routing only.

Now, here's a little bit of expert opinion. I find the whole notion of a message broker that "pushes" messages out to be a little backwards, and for this very reason- it's difficult to configure properly, and it is unclear what the benefit is. A queue system is a natural fit for a pull-based system. The processor can ask the broker for the next message when it is done processing the current message. This approach will ensure that load is balanced naturally and the messages don't get lost when processors disconnect or get knocked out.

Therefore, my recommendation is to drop the use of consumers altogether and switch over to using basic.get.

Question:

I have a Producer as follows:

public class MyProducer {

private static final String EXCHANGE_NAME = "messages";

public static void main(String[] argv)
              throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    String color1 = "red"
    String message1 = "message1"

    String color2 = "blue"
    String message2 = "message2"

    channel.basicPublish(EXCHANGE_NAME, color1, null, message1);
    channel.basicPublish(EXCHANGE_NAME, color2, null, message2);

    channel.close();
    connection.close();
}
}

and also a consumer:

public class MyConsumer {

private static final String EXCHANGE_NAME = "messages";

public static void main(String[] argv)
              throws java.io.IOException,
              java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();


    channel.queueBind(queueName, EXCHANGE_NAME, "color1");
    channel.queueBind(queueName, EXCHANGE_NAME, "color2");


    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

}

}

My questions are now:

  1. Do I have now only one queue named "queuName" or do I have two queues named "color1" and "color2"?
  2. I don't want to consume the messages immediatly. So what I want is to set a delay for each queue "color1" and "color2". How can I achieve this?

Answer:

Question-1) Do I have now only one queue named "queuName" or do I have two queues named "color1" and "color2"?

Answer : You have to must go through tutorial

https://www.rabbitmq.com/getstarted.html

base on that you decide how you want to create queue and which exchange types[direct, topic, headers and fanout] match to your requirement or sometime its happen no need to exchange ..so first see tutorial and then base on your requirement decide.

Question-2)I don't want to consume the messages immediately. So what I want is to set a delay for each queue "color1" and "color2". How can I achieve this?

Answer: for that you have to write your own logic which delay the consumer to find message from rabbit, you can go through thread also.

Enjoy Rabbit programming :)

Question:

Is it possible to force reque on another consumer in case of message reque ? For example my service wants to process request from queue (where multiple consumers are) and in the middle of processing finds that it has a low local space or whatever. It does not make sense to retry processing on this consumer, but another consumer can still process the message.


Answer:

Yes, the low-space consumer should simply nACK the message. This means negative acknowledgment. The message will get-requeued and evetually delivered to another consumer. More info here https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue

Question:

Is it possible to send a message to a queue using topic, and have 2 consumers who will receive and process the same message? Currently i have created 2 consumers who are observing a queue binding with an exchage topic, but the first consumer consumes the message and removes the queue, and the second consumer does not receive the message.


Answer:

If you need pub-sub semantics, you should consider to use queue per consumer. That is exactly how AMQP protocol works.

You can borrow some ideas from the official tutorial: https://www.rabbitmq.com/tutorials/tutorial-five-spring-amqp.html

Question:

I am trying to implement exponential backoff for consumer failures. To that end I have three queues with DLX thus: RETRY -> MAIN -> FAILED.

Anything rejected from MAIN goes FAILED, and anything added to RETRY goes into MAIN after a per-message TTL. The consumer receives from MAIN.

I've implemented an ErrorHandler and set it on the SimpleRabbitListenerContainerFactory. This handler either computes a new TTL and sends the message to the RETRY queue, or throws AmqpRejectAndDontRequeueException if that's not possible or retries are exceeded in order to DLX it to FAILED. The problem is, I cannot work out how to get rid of the original message.

As far as I can see I have to ack it, but the Channel is not available in the error handler, and there are no other exceptions to throw that would trigger an ack.

If instead I remove the MAIN -> FAILED DLX and switch to manually adding messages to FAILED, then if that doesn't work I've lost the message.

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
    // what to do here?
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }
  // or here?
}

Answer:

I think I immediately found the answer. Missed it before because I was throwing from the the wrong place.

@Override
public void handleError(Throwable t) {
  log.warn("Execution of Rabbit message listener failed.", t);

  try {
    queueForExponentialRetry(((ListenerExecutionFailedException) t).getFailedMessage());
  } catch (RuntimeException ex) {
    t.addSuppressed(ex);
    log.error("Not requeueing after failure", t);
    throw new AmqpRejectAndDontRequeueException(t);
  }

  throw new ImmediateAcknowledgeAmqpException("Queued for retry");
}

ImmediateAcknowledgeAmqpException

Special exception for listener implementations that want to signal that the current batch of messages should be acknowledged immediately (i.e. as soon as possible) without rollback, and without consuming any more messages within the current transaction.

This should be safe as I'm not using batches or transactions, only publisher returns.


Side note: I should also be aware that exponential backoff isn't going to actually work properly:

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired.

Question:

Recently we came across a situation where the messages were taken up from the queue by the consumer but did not reach the listener bound to that queue. We were able to make this assertion because we have an interceptor on our listeners (which prints a log message) that was not triggered for those messages. As soon as we restarted the server(tomcat) the messages were consumed and acked.

We would like to know the reason for such a behavior. We analyzed the thread dumps but with no success.


Answer:

Such issues are invariably caused by one of two problems:

  • container thread(s) "stuck" in user code
  • some network component (e.g. router) silently closing a connection that it thinks is idle such the the client and/or server is not aware that the connection is closed.

You seem to have eliminated the first (assuming your analysis is correct) so it's most likely the second.

You can enable heartbeats on the connection to avoid the network thinking a connection is idle. Refer to the RabbitMQ documentation.

Question:

Can spring-rabbit be made to support multiple concurrent consumers on a single topic?

Here are the details

My system uses manual ack mode with a topic exchange via spring-rabbit (Spring 4.0.6). The pattern is as follows:

  • Message comes into ChannelAwareMessageListener
  • A factory method generates an appropriate worker and passes in a reference to the channel
  • If the worker successfully processes the message, the message is Ack'd
  • If the worker is unsuccessful or an exception happens, the message is Nack'd and sent into a dead letter queue for later processing

Since some of these workers can take a fair bit of time to complete their IO-bound processing, I need to be able to set a higher number of concurrent consumers.

After some testing, however, I've noticed that there are times when several consumers are receiving the same message. Sure enough, a look at the documentation (http://docs.spring.io/spring-framework/docs/4.0.6.RELEASE/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrentConsumers-int-), confirms my finding:

Do not raise the number of concurrent consumers for a topic, unless vendor-specific setup measures clearly allow for it. With regular setup, this would lead to concurrent consumption of the same message, which is hardly ever desirable.

My questions are as follows:

  1. What the heck does "unless vendor-specific setup measures clearly allow for it" mean? Is there a patch/version/configuration or Rabbit that supports this?
  2. I can easily write code on the client that prevents a message from being processed if it's already being processed by another worker. What, then, do I do with this message? Send nack? Ignore it? What happens if I nack and then the worker which is actually processing the message sends an ack some time later? Will an exception be thrown?

Thanks in advance...


Answer:

The warning you mentionned is about JMS not RabbitMQ. Have a look at the Spring RabbitMQ documentation. The documentation does not contain this warning.

Once a message is delivered to a queue (whatever the exchange type), it can be taken by a consumer/worker only once at a time (assuming no issue).

If you receive the same message twice there is somewhere an issue:

  • the message is nacked and requeued
  • the channel/connection are closed on the client side
  • there is network issue and Rabbit requeue automaticly the message (the channel/connection are closed on the server and on the client side)

For the last two points you should get some errors messages.

Note that this point is in my opinion unnecessary and may explain the issue:

  • A factory method generates an appropriate worker and passes in a reference to the channel

The SimpleMessageListenerContainer already uses an Executor. As you are using your own executor there is maybe an issue between the spring-amqp channels pool (if you use any) and your executors eg. the channel is closed because spring-amqp believe it is not used anymore.

Instead of spawning your own thread, process the message on the same thread of the current ChannelAwareMessageListener#onMessage thread.

Question:

I have a JAVA application which creates consumers that listen to rabbitmq . I need to know the started consumer is still working fine and if not then i need to restart the consumer.

Is their any way i can do that. Currently my main application creates an Executor thread pool and passes this executor while creating new connection.

ExecutorService executor = Executors.newFixedThreadPool(30);
Connection connection = factory.newConnection(executor);

The main method then create 30 consumerApp object by calling constructor with new channel as argument and call the listen() method

for(int i=0;i<=30;i++) {
 ConsumerApp consumer = new ConsumerApp(i,connection.createChanell());
 consumer.listen() }

The listen method in consumerApp listen to a queue and start a DefaultConsumer Object which simply prints the received message

listen() { 
  try {
  channel.queueDeclare("test-queue-name", false, false, false, null);
  }
catch {
  System.out.println("Exception on creating Queue")
  }
Consumer consumer = new DefaultConsumer(this.channel) {
@Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received Message in consumer '"+consumerId+" "+  message + "'");
        }
    };
 //Now starting the consumer
  try {
    channel.basicConsume(QUEUE_NAME, true, consumer);   
   }
    catch (ShutdownSignalException | IOException ex) {
        ex.printStackTrace();
  }
 }

I want to know is their any way i can check the consumer is active . My idea is to catch the shutdown signal exception and recreate the consumer object and recall the listen method . Is this necessary as rabbitmq auto recovers and connnect back. ? But how can i ensure this ?

Is this any way achievable using the threadpool passed to rabbitmq connector.

I am using latest version of rabbitmq client 5.3.0


Answer:

Consumer has different methods that can help you track the state of your consumer. You're likely to be interested in handleConsumeOk and in handleCancel. Automatic connection recovery will indeed re-register consumers after a connection failure, but that doesn't prevent you from following their state manually to e.g. expose some information on JMX.

Question:

We are using Docker, Spring and RabbitMq in our system. I'm new to AMQP and I'm trying to figure out how the exchanges, queues, bindings and connections work in our system. We have multiple dockerized applications.

In short, when the applications start, they each start to listen to the same queue, and I don't understand how they all can expect to receive the same message. There were some similar questions in stackoverflow that gave me impression that our current system is possibly flawed.

In more detail:

When the docker containers start, multiple applications in different containers use RabbitAdmin to

declare the same exchange: rabbitAdmin.declareExchange(exchange)
declare the same queue: rabbitAdmin.declareQueue(queue)
bind those together: rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exhange).with("theSameKey");

They do this because they want to listen to the same messages. As far as I understand, only one of the dockerized programs manages to create the exchange and the queue, and the rest of them try but that has no effect.

After that, each of these applications create and start SimpleMessageContainers for the queue:

simpleMessageContainer.setMessageListener(messageListener)
simpleMessageContainer.addQueueNames(queue.getName())
simpleMessageContainer.start()

Using rabbitmqctl and rabbitmq's web interface, I can see that a single queue has multiple consumers on different channels, corresponding to different docker containers.

Isn't it so that the messageListener resides within the application but RabbitMq creates a Consumer for the queue at the broker when addQueueNames is called, and this Consumer then forwards messages through the connection to the application local messageListener?

Since multiple applications within different docker containers do that same, there are several Consumers for the same queue, just like I'm seeing with rabbitmqctl.

What I don't understand is that doesn't RabbitMq pass messages that end up in the queue to the channels/consumers in a round robin fashion, so that only one of the dockerized application will receive it? The exhanges are of direct and topic type with no fan-out exchanges. If all the dockerized applications wanted to receive the same message, shouldn't then all of them create an own queue for the same exhance with an own queue name but with the same routing key?

I fail to see how the current implementation possibly works properly.


Answer:

  • Every container (Actually Apps running inside) try to declare exchanges and queues. Once declared, all other declare commands has no effect (even if parameters are different).

Doesn't RabbitMq pass messages that end up in the queue to the channels/consumers in a round robin fashion.

That is correct. Though you could influence it with prefetch_count and message ack.

If all the dockerized applications wanted to receive the same message, shouldn't then all of them create an own queue for the same exchange with an own queue name but with the same routing key?

Yes, that is the only way because, as per AMQP protocol messages are load balanced between consumers. So if the same message to be processed by all the containers (consumers), they each should have different queue connected to same exchange (direct routing will do).

It is true whether consumers are running using docker or otherwise.

Question:

I have a frequent Channel shutdown: connection error issues (under 24.133.241:5671 thread, name is truncated) in RabbitMQ Java client (my producer and consumer are far apart). Most of the time consumer is automatically restarted as I have enabled heartbeat (15 seconds). However, there were some instances only Channel shutdown: connection error but no Consumer raised exception and no Restarting Consumer (under cTaskExecutor-4 thread).

My current workaround is to restart my application. Anyone can shed some light on this matter?

2017-03-20 12:42:38.856 ERROR 24245 --- [24.133.241:5671] o.s.a.r.c.CachingConnectionFactory
      : Channel shutdown: connection error
2017-03-20 12:42:39.642  WARN 24245 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerCont
ainer : Consumer raised exception, processing can restart if the connection factory supports
it
...
2017-03-20 12:42:39.642  INFO 24245 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerCont
ainer : Restarting Consumer: tags=[{amq.ctag-4CqrRsUP8plDpLQdNcOjDw=21-05060179}], channel=Ca
ched Rabbit Channel: AMQChannel(amqp://21-05060179@10.24.133.241:5671/,1), conn: Proxy@7ec317
54 Shared Rabbit Connection: SimpleConnection@44bac9ec [delegate=amqp://21-05060179@10.24.133
.241:5671/], acknowledgeMode=NONE local queue size=0

Answer:

Generally, this is due to the consumer thread being "stuck" in user code somewhere, so it can't react to the broken connection.

If you have network issues, perhaps it's stuck reading or writing to a socket; make sure you have timeouts set for any I/O operations.

Next time it happens take a thread dump to see what the consumer threads are doing.

Question:

I've developed a Java 7 application with RabbitMQ 3.6.6 (AMQP Client 4.0.2). My consumer class extends DefaultConsumer. Every request that I send via the RabbitMQ Management console starts a new thread (pool-2-thread-n) according to the log:

info: pool-2-thread-10 - NumberConsumer - Number request
info: pool-2-thread-11 - NumberConsumer - Number request
info: pool-2-thread-12 - NumberConsumer - Number request
info: pool-2-thread-13 - NumberConsumer - Number request
info: pool-2-thread-14 - NumberConsumer - Number request
info: pool-2-thread-15 - NumberConsumer - Number request
info: pool-2-thread-16 - NumberConsumer - Number request
info: pool-2-thread-17 - NumberConsumer - Number request
info: pool-2-thread-18 - NumberConsumer - Number request
info: pool-2-thread-19 - NumberConsumer - Number request

My consumer class:

public class NumberConsumer extends DefaultConsumer
{
   private static final Logger LOG = Logger.getLogger( NumberConsumer.class );

   public NumberConsumer(final Channel channel)
   {
      super( channel );
   }


   @Override
   public void handleDelivery(final String consumerTag, final Envelope envelope,
         final AMQP.BasicProperties properties, final byte[] body)
      throws IOException
   {
      final String request = new String( body, "UTF-8" );
      NumberConsumer.LOG.info( "Number request" );

      // ... do work and publish response ...
   }
}

Is this behavior expected? I'm quite sure that every thread is closed when it's done handling its request but why isn't their number limited? Can this be configured?


Answer:

The AMQP Java client has its own thread pool for consumers in the ConnectionFactory. If this shall not be used you have to supply an own ExecutorService as a parameter for the newConnection(..) method.

Question:

I'm aiming to achieve the following: php code sends request to queue - java code reads from code - java code sends reply to fixed reply queue - php code reads the reply. I have set up the following test (producer is for now in java):

POJO:

public class PojoListener {

public String handleMessage(String foo) {
    System.out.println("IN MESSAGE RECEIVER!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    return foo.toUpperCase();
}
}

Configuration:

@Configuration
public class FixedReplyQueueConfig {

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setUsername("urbanbuz");
    connectionFactory.setPassword("ub");
    connectionFactory.setVirtualHost("urbanbuzvhost");

    return connectionFactory;
}  

/**
 * @return Rabbit template with fixed reply queue.
 */
@Bean
public RabbitTemplate fixedReplyQRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
    template.setExchange(ex().getName());
    template.setRoutingKey("test");
    template.setReplyQueue(replyQueue());
    return template;
}

/**
 * @return The reply listener container - the rabbit template is the listener.
 */
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMe ssageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(fixedReplyQRabbitTemplate());
    return container;
}

/**
 * @return The listener container that handles the request and returns the reply.
 */
@Bean
public SimpleMessageListenerContainer serviceListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory());
    container.setQueues(requestQueue());
    container.setMessageListener(new MessageListenerAdapter(new PojoListener()));
    return container;
}

/**
 * @return a non-durable auto-delete exchange.
 */
@Bean
public DirectExchange ex() {
    return new DirectExchange("ub.exchange", false, true);
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(requestQueue()).to(ex()).with("test");
}

/**
 * @return an anonymous (auto-delete) queue.
 */
@Bean
public Queue requestQueue() {
    return new Queue("ub.request");
}

/**
 * @return an anonymous (auto-delete) queue.
 */
@Bean
public Queue replyQueue() {
    return new Queue("ub.reply");
}

/**
 * @return an admin to handle the declarations.
 */
@Bean
public RabbitAdmin admin() {
   return new RabbitAdmin(rabbitConnectionFactory());
}
}

Call in main method:

public class App {  
public static void main(String[] args) {        
    ApplicationContext context = new AnnotationConfigApplicationContext(FixedReplyQueueConfig.class);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

    String response = (String) rabbitTemplate.convertSendAndReceive("yalla");
    System.out.println("response" + response);
}
}

I have two questions:

When I run this I get the following error: RabbitTemplate [ERROR] No correlation header in reply though I see that both queues got the message.

Second question is how to I run the consumer code (the listener) only without sending a message (since eventually the caller will not be my java code)?


Answer:

That looks like it's based on the framework test case, which clearly works.

Are you sending any other messages to ub.reply? Is it empty?

The only way you can get that log message is if the template receives a reply that does not have a properly populated correlation id property.

You can just run the application and remove all the client side code, the container will listen for inbound requests.

Question:

I'm creating an application that sends messages for time-expensive processing to a consumer using RabbitMQ. However, I need to prioritize messages. When a message with high priority arrives, it must be processed even if all consumer instances are processing other messages.

AFAIK there is no possibility to preempt processing low-priority messages and switch to processing high-priority messages in Spring Boot and RabbitMQ.

Is it possible to create consumers that accept only high-priority messages or to run additional set of consumers on the fly when all other are busy and high-priority messages arrive?

I tried to add queues with x-max-priority=10 flag and to increase number of consumers but it doesn't solve my problem.

Imagine that we run 50 consumers and send 50 messages with low priority. While time-expensive processing is being performed, a new message arrives with high priority but it cannot be processed at once because all 50 consumers are busy.

There is a part of configuration that sets number of consumers

@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                               @Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  configurer.configure(factory, connectionFactory);
  factory.setConcurrentConsumers(50);
  factory.setMaxConcurrentConsumers(100);
  return factory;
}

Is there a way to create a set of consumers that accept messages high-priority messages (e.g. higher than 0) or to create consumer on the fly for high-priority messages?


Answer:

I don't know about a way to implement the preemptive strategy you describe, but there's a number of alternative things that you could consider.

Priority Setting

The first thing to take into account is the priority support in RabbitMQ itself.

Consider this excerpt from RabbitMQ in Depth by Gavin M. Roy:

"As of RabbitMQ 3.5.0, the priority field has been implemented as per the AMQP specification. It’s defined as an integer with possible values of 0 through 9 to be used for message prioritization in queues. As specified, if a message with a priority of 9 is published, and subsequently a message with a priority of 0 is published, a newly connected consumer would receive the message with the priority of 0 before the message with a priority of 9".

e.g.

rabbitTemplate.convertAndSend("Hello World!", message -> {
  MessageProperties properties = MessagePropertiesBuilder.newInstance()
                                                         .setPriority(0)
                                                         .build();
  return MessageBuilder.fromMessage(message)
                       .andProperties(properties)
                       .build();
});
Priority-based Exchange

A second alternative is to define a topic exchange and define a routing key that considers your priority.

For example, consider an exchange of events using a routing key of pattern EventName.Priority e.g. OrderPlaced.High, OrderPlaced.Normal or OrderPlaced.Low.

Based on that you could have a queue bound to just orders of high priority i.e. OrderPlaced.High and a number of dedicated consumers just for that queue.

e.g.

String routingKey = String.format("%s.%s", event.name(), event.priority());
rabbit.convertAndSend(routingKey, event);

With a listener like the one below where the queue high-priority-orders is bound to the events exchange for event OrderPlaced and priority High using routing key OrderPlaced.High.

@Component
@RabbitListener(queues = "high-priority-orders", containerFactory="orders")
public class HighPriorityOrdersListener {

 @RabbitHandler
 public void onOrderPlaced(OrderPlacedEvent orderPlaced) {
   //...
 }
}

Obviously, you will need a dedicated thread pool (in the orders container factory above) to attend the high priority requests.

Question:

I have a RabbitMQ client application that listens to a specific queue. The client creates and instance of DefaultConsumer and implements the handleDelivery method. Here is the code

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

The method receiveMessages() is called frequently through a thread every 500ms and drains the messages into a different List for consumption. Due to this poll on receiveMessages() I observed that the consumer tags are continuously getting created and growing when viewed through rabbit console like in the picture. Is it normal to see those increasing consumer tags?


Answer:

Is it normal to see those increasing consumer tags?

No, your code has an error. You need to either just use a long-running consumer or you have to cancel your consumer when you are done with it.

I can't see any need to "poll" receiveMessages - just let it run on its own and it will add messages to your synchronized queue as you expect.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

Question:

I have following code, so I read messages from one queue and resend it, to another one.

I am interested in setConcurrentConsumers(3) method does it means that three listener threads will be created?

And in such case how I understood order of re-sending to queue1 and queue2 will not be met. As for me is importation to have the same messages order like when I receive it.

@RabbitListener(queues = "queue",containerFactory="rabbitListenerContainerFactory") 
public void processQueue(String message) {
    rabittemplate.send("queue1", message);
    rabittemplate.send("queue2", message);
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

Answer:

The whole point of enabling concurrentConsumers is to allow to process messages in a parallel fashion, in order to speed up the overall execution. By doing this you are automatically singing a contract where you accept that everything is asynchronous and you can not anymore assume order.

If you define a sequence between threads and wait for the previous to finish so that you can respect the order, than you are returning to sequential processing and there is no benefit of having enabled concurrent consumes, instead you will be having overhead for handling multiple threads.

Question:

I took the example from here http://www.rabbitmq.com/tutorials/tutorial-six-java.html, added one more RPC call from RPCClient and added some logging into stdout. As a result, when the second call is executed, rabbitmq uses the consumer with wrong correlation id which is not expected behavior. Is it a bug or am I getting anything wrong?

RPCServer:

package com.foo.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

  private static final String RPC_QUEUE_NAME = "sap-consume";

  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }

  public static void main(String[] argv) {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);

    Connection connection = null;
    try {
      connection      = factory.newConnection();
      final Channel channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      System.out.println(" [x] Awaiting RPC requests");

      Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          AMQP.BasicProperties replyProps = new AMQP.BasicProperties
            .Builder()
            .correlationId(properties.getCorrelationId())
            .build();

          String response = "";

          try {
            String message = new String(body,"UTF-8");
            int n = Integer.parseInt(message);

            System.out.println(" [.] fib(" + message + ")");
            response += fib(n);
          }
          catch (RuntimeException e){
            System.out.println(" [.] " + e.toString());
          }
          finally {
            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
            channel.basicAck(envelope.getDeliveryTag(), false);
        // RabbitMq consumer worker thread notifies the RPC server owner thread
            synchronized(this) {
              this.notify();
            }
          }
        }
      };

      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
      // Wait and be prepared to consume the message from RPC client.
      while (true) {
        synchronized(consumer) {
          try {
            consumer.wait();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    } catch (IOException | TimeoutException e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null)
        try {
          connection.close();
        } catch (IOException _ignore) {}
    }
  }
}

RPCCLient:

package com.bar.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

  private Connection connection;
  private Channel channel;
  private String requestQueueName = "sap-consume";
  private String replyQueueName;

  public RPCClient() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);

    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();
  }

  public String call(String message) throws IOException, InterruptedException {
    final String corrId = UUID.randomUUID().toString();

    AMQP.BasicProperties props = new AMQP.BasicProperties
      .Builder()
      .correlationId(corrId)
      .replyTo(replyQueueName)
      .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

    final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getCorrelationId().equals(corrId)) {
          System.out.println("Correlation Id" + properties.getCorrelationId() + " corresponds to expected one.");
          response.offer(new String(body, "UTF-8"));
        } else {
          System.out.println("Correlation Id" + properties.getCorrelationId() + " doesn't correspond to expected one " + corrId);
        }
      }
    });

    return response.take();
  }

  public void close() throws IOException {
    connection.close();
  }

  public static void main(String[] argv) {
    RPCClient rpc = null;
    String response = null;
    try {
      rpc = new RPCClient();

      System.out.println(" [x] Requesting fib(30)");
      response = rpc.call("30");
      System.out.println(" [.] Got '" + response + "'");
      System.out.println(" [x] Requesting fib(40)");
      response = rpc.call("40");
      System.out.println(" [.] Got '" + response + "'");
    } catch (IOException | TimeoutException | InterruptedException e) {
      e.printStackTrace();
    } finally {
      if (rpc != null) {
        try {
          rpc.close();
        } catch (IOException _ignore) {
        }
      }
    }
  }
}

Answer:

Yes you found a bug in the tutorial code. I have opened a pull request to fix it here and you can find the explanation of what's happening as well:

https://github.com/rabbitmq/rabbitmq-tutorials/pull/174


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

Question:

I'm implementing two services: A and B. I'm trying to implement a syncronous communication (Remote Procedure Call, or RPC) between A and B.

Scenario

Very simple. A needs a information from B, so A send a message and wait for a reply from B. A can't continue without this information

The question

I'm using the method rabbitTemplate.convertSendAndReceive from Spring RabbitMQ. This works as expected if the B is running. My code is very similar from this link.

If B is not running, A waits for a little time (few seconds) and receive as reply a null. In this case, I was expectating some exception saying that there is no consumer available.

The documentation says:

By default, the send and receive methods will timeout after 5 seconds and return null. This can be modified by setting the replyTimeout property. Starting with version 1.5, if you set the mandatory property to true (or the mandatory-expression evaluates to true for a particular message), if the message cannot be delivered to a queue an AmqpMessageReturnedException will be thrown. This exception has returnedMessage, replyCode, replyText properties, as well as the exchange and routingKey used for the send.

I tried to set:

rabbitTemplate.setMandatory(true);

But any exception is throwed. I think because the queue is still alive in RabbitMQ, because there is some messages sent from A when B are out and they are waiting for to processed by B.

So, the null return is how I know that there is no consumer?

Another problem in this case: the message sent from A will be waiting in the queue until B consumes. But as I'm implementing a syncronous communication, this behaviour not make any sense if B is not running, because when B starts again it will consume and process without return the information to A. This will be a "lost" processing. Is this normal with RabbitMQ RPC communication?

I'm using Spring Boot 1.5.9 and the dependency spring-cloud-starter-stream-rabbit


Answer:

Mandatory has nothing to with consumers; it's to ensure the message was delivered to a queue.

Yes, getting null is an indication that the operation timed out (5 seconds by default).

You can configure the queue with a time to live (ttl) so stale messages will be removed from the queue if they are not processed within that time.

Question:

I have a rabbit consumer, and inside I have a thread pool. I decided to have a thread pool because I need to wait for calculations to complete. But as I noticed, TP usage causes weird effects like freezing and so on. So I want to ask, is it correct to use TP inside rabbit consumer? Is it possible to achieve the same functionality using spring rabbit tools?

...
ThreadPoolExecutor pool = new ThreadPoolExecutor(cores, 50, 30L,  TimeUnit.SECONDS, new ArrayBlockingQueue<>(3000));

public void onMessage(){

   pool.execute(()->{
     //do something
     handleMessage(...);//return to some output queue
   });

}

or

    public void onMessage(){
         //do something
         handleMessage(...);//return to some output queue
    }

Answer:

It is generally better to simply increase the concurrentConsumers in the listener container than to hand off to your own thread-pool.

Your code needs to be thread-safe either way.

With your current solution, you risk message loss since the message is acknowledged when the listener exits.

Question:

I have a Java consumer application that connects to a RabbitMQ (3.2.4) non-deletable fanout exchange called "my_exhange_foo":

Connection connection = connectionFactory.newConnection(consumerPool);
Channel channel = connection.createChannel();
channel.exchangeDeclare("my_exhange_foo", "fanout"); // is this necessary?

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "my_exhange_foo", "");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

The client consumer application receives the messages regardless of whether the exchange is declared or not.

I followed the example ReceiveLogsDirect.java in this tutorial https://www.rabbitmq.com/tutorials/tutorial-four-java.html

and read the api but cannot figure out what the purpose of declaring the exchange is on the consumer side. I would appreciate if someone can shed some light on it.


Answer:

what is the purpose of declaring the exchange on the consumer side?

It lets one start the consumer process before the producer process has been started. Without it, if the consumer was started first then it would error. Having the flexibility to start the consumer first is useful when working with a production system, it reduces possible problems caused by the inherent timing of restarting systems.

Question:

Documentation states that the default recovery interval is 5000ms for RabbitMQ binder consumers.

RabbitMQ binder configuration properties are prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.consumer.

recoveryInterval is a property with default value of 5000ms. I would like to be able to change it for all consumers in the configuration.

I tried spring.cloud.stream.rabbit.bindings.default.consumer.recoveryInterval=3000. It didn't work. It's still ~5000ms.

How can I change it?

Thanks


Answer:

It's

spring.cloud.stream.rabbit.default.consumer.recovery-interval=3000

(no bindings.).

What you have is configuration for a binding called default.

Question:

I want develop an application where in the python code sends the message using rabbitmq and the consumer is Spring boot rabbitmq code.

sender.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                     exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                  routing_key=routing_key,
                  body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

How do I configure a rabbitmq receiver using spring boot? What are the necessary configurations required at the receiver side? Please help.


Answer:

@SpringBootApplication
public class So49512910Application {

    public static void main(String[] args) {
        SpringApplication.run(So49512910Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("someQueue");
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("topic_logs");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("whatever.topic.pattern.you.want.to.match");
    }

    @RabbitListener(queues = "someQueue")
    public void listener(String in) {
        System.out.println(in);
    }

}

Or, if the exchange already exists...

@SpringBootApplication
public class So49512910Application {

    public static void main(String[] args) {
        SpringApplication.run(So49512910Application.class, args);
    }

    @Bean
    public Queue queue() {
        return new Queue("someQueue");
    }

    @Bean
    public Binding binding() {
        return new Binding("someQueue", DestinationType.QUEUE, "topic_logs", "rk.pattern", null);
    }

    @RabbitListener(queues = "someQueue")
    public void listener(String in) {
        System.out.println(in);
    }

}

Question:

I'm newbie RabbitMQ java client. My problem: I created 10 consumer and add them into the queue. Every consumer use 10 seconds in order to handle my process. I checked rabbit's page, i seen my queue had 4000 message no send to client. I checked log client and result was get one message for one consumer, after 10 seconds I get one message for one consumer and so on .. I want get 10 message for all consumer at the time(10 message - 10 consumer process at the time) Please help me, I didn't find solution for problem. Thank a lot.

        while (!isRetry) {
        try {
            isRetry = true;
            connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
            String queueName = "webhook_customer";
            String exchangeName = "webhook_exchange";
            String routingKey = "customer";
            System.out.println("step2");

            Channel channel = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            channel.basicQos(1);
            for (int i = 0; i < numberWorker; i++) {
                Consumer consumer = new QueueingConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException {
                        long startProcess = System.nanoTime();
                        JSONObject profile = null;
                        try {

                        } catch (IOException ioe) {
                            handleLogError(profile, ioe.getMessage().toString());
                        } catch (Exception e) {
                            handleLogError(profile, e.getMessage());
                        } finally {
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            long endProcess = System.nanoTime();
                            _logger.info("===========######### TIME PROCESS  + " + (endProcess - startProcess) + " Nano Seconds  ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
                        }
                    }
                };

                channel.basicConsume(queueName, false, consumer);
            }
            System.out.printf("Start Listening message ...");
        } catch (Exception e) {
            System.out.println("exception " + e.getMessage());
            isRetry = closeConnection(connection);
            e.printStackTrace();
        } finally {
        }
        if (!isRetry) {
            try {
                System.out.println("sleep waiting retry ...");
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //END
    }

Answer:

I did found solution in my case. I use new thread in consumer when message come in and I process in it. And I create multiple channel in order to multiple message at the time. I use threadpool to control thread

Question:

We are using Java rabbitMq with spring boot in a distributed service architecture. One service gets an HTTP request and forwards it to an unkown queue for processing. At the same time it has to wait for a response on another queue before it can terminate the HTTP request. (It's a preview request that gets its work done by a renderer).

There can be more than one instance of ServiceA (the HTTP Interface) and ServiceB (the renderer) so with every preview message we also send a unique ID to be used as routing key.

I'm having trouble with the BlockingConsumer. Whenever I call consumer.nextMessage() I get the same message over and over again. This is doubly weird, as for one it should be ACKed and removed from the queue and for another the consumer shouldn't even bother with it as the unique ID we used is no longer bound to the queue. nextMessage even returns before the renderer service is done and has sent its done message back.

Here's the simplified setup:

general

All services use a global DirectExchange for all messages

@Bean
  public DirectExchange globalDirectExchange() {
    return new DirectExchange(EXCHANGE_NAME, false, true);
  }

ServiceA (handles the HTTP request):

 private Content requestPreviewByKey(RenderMessage renderMessage, String previewKey) {
    String renderDoneRoutingKey= UUID.randomUUID().toString();
    renderMessage.setPreviewDoneKey(renderDoneId);
    Binding binding = BindingBuilder.bind(previewDoneQueue).to(globalDirectExchange)
        .with(renderDoneRoutingKey);
    try {
      amqpAdmin.declareBinding(binding);
      rabbitProducer.sendPreviewRequestToKey(renderMessage, previewKey);
      return getContentBlocking();
    } catch (Exception e) {
      logErrorIfDebug(type, e);
      throw new ApiException(BaseErrorCode.COMMUNICATION_ERROR, "Could not render preview");
    } finally {
      amqpAdmin.removeBinding(binding);
    }
  }


  private Content getContentBlocking() {
    BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(rabbitMqConfig.connectionFactory(), new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<>(), AcknowledgeMode.AUTO, true, 1, PREVIEW_DONE_QUEUE);
    try {
      blockingQueueConsumer.start();
      Message message = blockingQueueConsumer.nextMessage(waitForPreviewMs);
      if (!StringUtils.isEmpty(message)) {
        String result = new String(message.getBody());
        return JsonUtils.stringToObject(result, Content.class);
      }    
      throw new ApiException("Could not render preview");
    } catch (Exception e) {
      logError(e);
      throw new ApiException("Could not render preview");
    } finally {
      blockingQueueConsumer.stop();
    }

}

Service B

I'll spare you most of the code. My log says everything is going well and as soon as its done the service sends the correct message to the UUID key that was sent with the initial render request.

public void sendPreviewDoneMessage(Content content, String previewDoneKey) {
    String message = JsonUtils.objectToString(content);
    rabbitTemplate.convertAndSend(globalDirectExchange, previewDoneKey, message);
}

The whole thing works... Once... The real issues seems to be the consumer setup. Why do I keep getting the same (first) message from the queue when I use nextMessage(). Doesn't creating and removing a Bindung ensure, that only messages bound to that routingKey are even received in that instance? And doesn't nextMessage() acknowledge the message and remove it from the queue?!

Thank's a lot for bearing with me and even more for any helpful answer!


Answer:

BlockingQueueConsumer is not designed to be used directly; it is a component of the SimpleMessageListenerContainer, which will take care of acking the message after it has been consumed by a listener (the container calls commitIfNecessary).

There may be other unexpected side effects of using this consumer directly.

I strongly advise using the listener container to consume messages.

If you just want to receive messages on demand, use a RabbitTemplate receive() or receiveAndConvert() method instead.

Question:

I'm using Spring Boot with spring-amqp and RabbitMQ to send messages between two JVMs that I am running locally. Depending on the order that I start each app, I'll sometimes get a ClassNotFoundException. I have a multiproject setup like so:

- Project root
   - common (contains all events / messages that are sent)
   - server
   - client

When the server is started first, it waits for a message from the client. When the client is then started, it implements a ApplicationListener<ApplicationReadyEvent> and sends a message to the server to signal that it is ready.

Server listener:

@Component
@RabbitListener(queues =  "server.${server.id}")
public class ServerListener {
    private static final Logger logger = LoggerFactory.getLogger(ServerListener.class);

    @RabbitHandler
    public void onMessageReceived(@Payload ClientAvailableEvent event) {
        logger.info("Server: Received request from client ID = {}", event.getClientId());
    }
}

Client Producer:

@Component
public class ClientReadyProducer implements ApplicationListener<ApplicationReadyEvent> {
    private static final Logger logger = LoggerFactory.getLogger(ClientReadyProducer.class);

    @Value("${client.id}")
    private String id;

    private final RabbitTemplate template;

    @Autowired
    public EventBasedModuleRegistration(RabbitTemplate template) {
        this.template = template;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {    
        logger.info("Client initialized.");
        ClientAvailableEvent event = ClientAvailableEvent.from(id);
        template.convertSendAndReceive("server.exchange.all", "", event);
    }
}

When the server gets this message, the log blows up with an infinite number of stack traces, complaining that it can't find ClientAvailableEvent:

2017-01-30 09:30:22.610  WARN 63573 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler :  [][] Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Could not deserialize object type
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:82)
    at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:110)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:185)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:173)
    at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:118)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:102)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:88)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757)
    ... 10 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.event.ClientAvailableEvent
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.springframework.util.ClassUtils.forName(ClassUtils.java:250)
    at org.springframework.core.ConfigurableObjectInputStream.resolveClass(ConfigurableObjectInputStream.java:74)
    at org.springframework.amqp.support.converter.SimpleMessageConverter$1.resolveClass(SimpleMessageConverter.java:179)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at java.util.ArrayList.readObject(ArrayList.java:791)
    at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:76)
    ... 17 common frames omitted

However, I can get this exception to go away. With the client still running, if I restart the server, everything is fine and continues to work without an issue. I can restart the client, it will send another ClientAvailableEvent, and the server will happily deserialize it.

Here are my Spring classes:

ServerConfiguration:

@Configuration
@EnableRabbit
public class ServerConfiguration {
    @Value("${server.id}")
    public String id;

    @Bean
    public Queue serverQueue() {
        return new Queue("server." + id, false, true, true);
    }

    @Bean
    public TopicExchange serverExchange() {
        return new TopicExchange("server.exchange");
    }

    @Bean
    public Binding bindingById() {
        return BindingBuilder.bind(serverQueue()).to(serverExchange()).with(id);
    }

    @Bean
    public FanoutExchange allServersExchange() {
        return new FanoutExchange("server.exchange.all");
    }

    @Bean
    public Binding bindingToAll() {
        return BindingBuilder.bind(serverQueue()).to(allServersExchange());
    }

    @Bean
    public TopicExchange clientExchange() {
        return new TopicExchange("client.exchange");
    }

    @Bean
    public RabbitAdmin amqpAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }
}

Client Configuration:

@Configuration
@EnableRabbit
public class ClientConfiguration {
    @Value("${client.id}")
    private String id;

    @Bean
    public Queue clientQueue() {
        return new Queue("client." + id, false, true, true);
    }

    @Bean
    public TopicExchange clientExchange() {
        return new TopicExchange("client.exchange");
    }

    @Bean
    public Binding bindingById() {
        return BindingBuilder.bind(clientQueue()).to(clientExchange()).with(id);
    }

    @Bean
    public TopicExchange clientExchange() {
        return new TopicExchange("client.exchange");
    }

    @Bean
    public FanoutExchange allClientsExchange() {
        return new FanoutExchange("client.exchange.all");
    }

    @Bean
    public Binding bindingToAll() {
        return BindingBuilder.bind(clientQueue()).to(allClientsExchange());
    }

    @Bean
    public RabbitAdmin amqpAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }
}

I originally found this question that has a near-identical stacktrace, but the solution in that case was to put all of the common events / models in one project and include that project into both the server and client projects. However, I'm already doing that. I also tried using JSON to send the messages (by adding the following to both configurations) instead of the standard serialization:

@Bean
public MessageConverter producerJsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJsonMessageConverter(){
    return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(consumerJsonMessageConverter());
    return factory;
}

@Bean
public RabbitTemplate configureRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(producerJsonMessageConverter());
    return template;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(producerJsonMessageConverter());
    return factory;
}

Using JSON resulted in a similar stacktrace complaining about the ClassNotFoundException.

Here are the relevant dependencies I'm using:

  • Spring Boot v1.3.8.RELEASE
  • Spring AMQP v1.5.6.RELEASE
  • Spring Rabbit v1.5.6.RELEASE

Answer:

This is most likely some kind of Classloader problem - perhaps you somehow have two versions of that class on the classpath.

The simplest way I have found to debug issues like this is run the JVM with -verbose and monitor where the class is loaded from.

Compare the log between a run that works and one that does not.

I am not surprised you get the same issue with JSON because the fully qualified class name is passed in a header.

Also, do you have unique packages across your jars? You can get issues like this if you serve up the same package from different jars.

Question:

Do we need to create individual channels for each thread or use the same channel for all threads? Also the same question about connection. Do we need to use different connections for each thread or a single connection? What is the difference when we use one channel across all threads and individual channels for each thread?


Answer:

Connection:

According to the java doc (https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.5/rabbitmq-java-client-javadoc-3.6.5/):

Current implementations are thread-safe for code at the client API level, and in fact thread-safe internally except for code within RPC calls.

Channel:

According to the doc (https://www.rabbitmq.com/api-guide.html):

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with * Publisher Confirms.

Question:

I am trying to use RabbitMQ messaging. The message is sent to queue from producer, but the consumer doesn't receive it. I checked the server and it's running properly.

ProducerSender

    //the messageToSend is set in another class.

        private static final String TASK_QUEUE_NAME = "hello";    
        public void writeMessage(Message messageToSend) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = messageToSend.getTitle()+" "+messageToSend.getYear()+" "+messageToSend.getPrice();
            channel.basicPublish("", TASK_QUEUE_NAME, null,
                    message.getBytes());

            channel.close();
            connection.close();
    }

ConsumerReceiver

public void readMessage() throws IOException, TimeoutException {
    Socket clientSocket = new Socket(host, port);
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8"); //message is null
            System.out.println(" [x] Received '" + message + "'");
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, true, consumer);
}

What am I doing wrong?


Answer:

This code was based on some example? because is different than the form shown at the RabbitMQ Java guide. I'll send you the way I use, maybe you can figure what is missing from it.

QueueingConsumer.Delivery queueMessage = consumer.nextDelivery();
String message = new String(queueMessage.getBody());
// if auto-ack is not set
channel.basicAck(queueMessage.getEnvelope().getDeliveryTag(), false);

this was based on the examples at https://www.rabbitmq.com/tutorials/tutorial-two-java.html

Question:

I'm trying to use rabbitmq's queue for producing and consuming messages of more than one datatype. For eg. i am able to push an employee object (in json format) using jackson serializer and an String object in the queue but when i'm trying to consume the message i'm getting parse error. (ultimately i'm able to push anything but when i'm trying to consume i'm not able to).

```
**Producer code**

package com.poc.springbootrabbitmq.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.poc.springbootrabbitmq.model.Employee;

@Service
public class RabbitMQSender 
{
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${exchangeName}")
    private String exchange;

    public void send(Employee company) 
    {
        amqpTemplate.convertAndSend(exchange,"error", company);
        amqpTemplate.convertAndSend(exchange,"error", "some String");
    }

}


**Consumer Code**



package com.rabbitmq.consumer.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.consumer.model.Employee;

@Component
public class RabbitMQConsumer 
{
    @RabbitListener(queues = "${queueName}")
    public void recievedMessage(Employee employee) {
        System.out.println("Recieved Message From Queue " + employee);
    }

    @RabbitListener(queues = "${queueName}")
    public void recievedMessage1(String string) {
        System.out.println("Recieved Message From Queue " + string);
    }

}


**Employee.java**

package com.rabbitmq.consumer.model;

public class Employee {

    private String empName;
    private String empId;

    public String getEmpName() {
        return empName;
    }

    public void setEmpName(String empName) {
        this.empName = empName;
    }

    public String getEmpId() {
        return empId;
    }

    public void setEmpId(String empId) {
        this.empId = empId;
    }

    @Override
    public String toString() {
        return "Employee [empName=" + empName + ", empId=" + empId + "]";
    }

}

**My defined beans/configurations**

package com.rabbitmq.consumer.configuration;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

@EnableRabbit
@Configuration
public class ConsumerConfiguration implements RabbitListenerConfigurer
{

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() 
    {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) 
    {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

**ErrorLog I'm getting**

2019-04-21 20:27:31.585  WARN 5436 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.rabbitmq.consumer.service.RabbitMQConsumer.recievedMessage3(com.rabbitmq.consumer.model.Employee)]
Bean [com.rabbitmq.consumer.service.RabbitMQConsumer@1457fde]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:193) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `com.rabbitmq.consumer.model.Employee` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('some String')
 at [Source: (byte[])""some String""; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.rabbitmq.consumer.model.Employee` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('some String')
 at [Source: (byte[])""some String""; line: 1, column: 1]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:234) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 12 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.rabbitmq.consumer.model.Employee` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('some String')
 at [Source: (byte[])""some String""; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1343) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1032) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3121) ~[jackson-databind-2.9.8.jar:2.9.8]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:221) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    ... 19 common frames omitted

2019-04-21 20:27:31.680  WARN 5436 --- [ntContainer#0-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'"some String"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=sampleTopicExchange, receivedRoutingKey=x.y.error, deliveryTag=6, consumerTag=amq.ctag-ZlMlqxVR24vqzgAGGWE4-Q, consumerQueue=sampleTopicQueue1])
2019-04-21 20:27:31.728 ERROR 5436 --- [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Execution of Rabbit message listener failed, and the error handler threw an exception

org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:105) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1378) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1631) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1424) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.rabbitmq.consumer.service.RabbitMQConsumer.recievedMessage3(com.rabbitmq.consumer.model.Employee)]
Bean [com.rabbitmq.consumer.service.RabbitMQConsumer@1457fde]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:193) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 6 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `com.rabbitmq.consumer.model.Employee` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('some String')
 at [Source: (byte[])""some String""; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.rabbitmq.consumer.model.Employee` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('some String')
 at [Source: (byte[])""some String""; line: 1, column: 1]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:234) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 12 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.rabbitmq.consumer.model.Employee` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('some String')
 at [Source: (byte[])""some String""; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1343) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1032) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013) ~[jackson-databind-2.9.8.jar:2.9.8]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3121) ~[jackson-databind-2.9.8.jar:2.9.8]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:221) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    ... 19 common frames omitted
```

Answer:

@Component
public class RabbitMQConsumer 
{
    @RabbitListener(queues = "${queueName}")
    public void recievedMessage(Employee employee) {
        System.out.println("Recieved Message From Queue " + employee);
    }

    @RabbitListener(queues = "${queueName}")
    public void recievedMessage1(String string) {
        System.out.println("Recieved Message From Queue " + string);
    }

}

With that configuration, you'll get 2 listener containers which will compete for all messages, you need @RabbitListener at the class level and @RabbitHandler at the method level.

However, there is a catch-22 since the payload needs to be deserialized first so the framework can determine which method to call; we can'y infer the type from the method parameter, you have to use a Jackson2JsonMessageConverter in the container factory so it can do the conversion first, based on information in message headers.

See the documentation.

Question:

My RabbitMQ server is running perfectly. Check below for ports and ip.

C:\Users\parmarc>netstat -ano | find "5672"
  TCP    0.0.0.0:5672           0.0.0.0:0              LISTENING       2704
  TCP    0.0.0.0:15672          0.0.0.0:0              LISTENING       2704
  TCP    0.0.0.0:55672          0.0.0.0:0              LISTENING       2704
  TCP    127.0.0.1:5672         127.0.0.1:61775        ESTABLISHED     2704
  TCP    127.0.0.1:15672        127.0.0.1:57671        ESTABLISHED     2704
  TCP    127.0.0.1:57671        127.0.0.1:15672        ESTABLISHED     8408
  TCP    127.0.0.1:61775        127.0.0.1:5672         ESTABLISHED     10312
  TCP    [::]:5672              [::]:0                 LISTENING       2704

I keep getting following error regarding consumer. I am able to push things into RabbitMQ but not able to consume because of this error.

WARN : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - 
       Consumer raised exception, processing can restart if the connection factory supports it. 
       Exception summary: org.springframework.amqp.AmqpIOException: java.net.UnknownHostException: 127.0.0.1
INFO : org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - 
       Restarting Consumer: tag=[null], channel=null, acknowledgeMode=AUTO 
       local queue size=0

Below is my mq-Config.properties file:

server.host=127.0.0.1   
server.port=5672
search.service.vmhost=/
search.service.username=guest
search.service.password=guest


search.service.indexwriter.queue.name=search.service.indexwriter.queue.test
search.service.indexwriter.exchange.name=search.service.indexwriter.exchange.test
search.service.indexwriter.routing.key=search.service.indexwriter.routing.test
numberof.concurrentconsumer=10
max.failure.retry.attempts=3

Below is my mq-Config-consumer.properties file:

#######Consumer Properties######
retailer.syncservice.consumer.server.host=127.0.0.1
retailer.syncservice.consumer.server.port=5672
retailer.syncservice.consumer.service.vmhost=/
retailer.syncservice.consumer.service.username=guest
retailer.syncservice.consumer.service.password=guest
retailer.syncservice.consumer.queue.name=retailer.syncservice.queue.fanoutqueue.test
retailer.syncservice.consumer.exchange.name=retailer.consumer.direct.exchange.test
retailer.syncservice.consumer.routing.key=retailer.consumer.routingkey.test
numberof.concurrentconsumer=10

Can anybody suggest what is wrong with the consumer setup? I tried googling it but did not find satisfactory answer which solves my issue. So asking it here.


Answer:

I solved it with the help of my colleague. It was really a silly mistake.

Tab Character after value in properties file

There was a after 127.0.0.1 in mq-Config.properties file:

server.host = 127.0.0.1#tab character was here

Because of that it was not able to connect. I guess rabbitMQ is not trimming things from properties file. So even if there is a space after your value it will behave unexpectedly.

I removed tab character.

server.host = 127.0.0.1

After that it worked.