Hot questions for Using RabbitMQ in queue

Question:

I am using RabbitMQ together with Spring's RabbitTemplate.

When sending messages to queues using the template send methods, I want the queue to automatically be created/declared if it is not already exists.

It is very important since according to our business logic queue names are generated on run-time and I cannot declare them in advance.

Previously we have used JmsTemplate and any call to send or receive automatically created the queue.


Answer:

You can use a RabbitAdmin to automatically declare the exchange, queue, and binding. Check out this thread for more detail. This forum also bit related to your scenario. I have not tried spring with AMQP though, but I believe this would do it.

/**
 * Required for executing adminstration functions against an AMQP Broker
 */
@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

Keep coding !

Question:

After Google for a few days, and i believe i am totally lost. I would like to implement a kind of priority queue that has about 3 queues:

  1. high priority queue (daily), that needs to be process first.
  2. mid priority queue (weekly), that will process if no items in queue #1. (it is ok message in this queue it never process at all)
  3. low priority queue (monthly), that will process if no items in queue #1 & #2. (it is ok message in this queue it never process at all)

Initially I have the following flow, to have a consumer to consume messages from all three queues and checks whether there is any items in queue #1, #2 and #3. and then I realize that this is wrong because:

  1. I am totally lost with a question: "How do I know which queue it is coming from?".
  2. I'm already consuming a message regardless from any queue, So if I get an object from lower priority queue, am I gonna put it back to the queue if I discover there is a message at the higher priority queue?

Following is my current configurations, which shows what an idiot I am.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="localhost" />

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
    exchange="" routing-key="daily_queue"/>

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
    exchange="" routing-key="weekly_queue"/>

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
    exchange="" routing-key="monthly_queue"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>    

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>    

<bean id="Consumer" class="com.test.Consumer" />

</beans>

Any idea how should I tackle this with priority queue?

ps: I also wonder, if Apache Camel has something I can depend on?

UPDATE 1: I just saw this from Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" the sequencer on JMSPriority seems to be what im looking for, anyone has tried this before?

UPDATE 2: assuming i am to use RabbitMQ's plugin base on @Gary Russell recommendation, I have the following spring-rabbitmq context XML configuration, which seems to make sense (by guest..):

<rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
            <entry key="x-max-priority" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>

<bean id="Consumer" class="com.test.Consumer" />

The above xml configuration has successfully create a Queue, with name: "ad_google_dfa_reporting_queue", and with Parameter arguments: x-max-priority: 10 & durable: true

But not when comes to the code that send the message with priority, I totally lost it. How to define the priority as mention in the Sample URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?

UPDATE 3: Based on the @Gary's answer, i manage to sent message with priority set in the message, as per image below: However, when i sent in 1000 messages with random priority between 1-10, the consumer is consuming message with all kinds of priority. (I was expecting only the high priority message to be consume first). following is the code for Message producer:

    Random random = new Random();
    for (int i=0; i< 1000; i++){
        final int priority = random.nextInt(10 - 1 + 1) + 1;

        DfaReportingModel model = new DfaReportingModel();
        model.setReportType(DfaReportingModel.ReportType.FACT);
        model.setUserProfileId(0l + priority);
        amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(priority);
                return message;
            }
        });
    }

And following is the code for Message consumer:

    public void consume(DfaReportingModel message) {
        System.out.println(message.getUserProfileId());

        Thread.sleep(500);
    }

The result im getting:

9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,

UPDATE 4: Problem solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue is working in my environment, I presume that the problem is around the spring context. Hence, after countless time on try and error with different type of configurations, and I pin point the exact combination that will make this works! and is as per following:

    <rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
        <entry key="x-max-priority">
            <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
        </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

Without specifically define the value is Integer type, the priority queue does not work. Finally, it is solved. Yay!


Answer:

RabbitMQ now has a priority queue plugin where messages are delivered in priority order. It would be better to use that rather than your scheme of requeueing low priority messages which will be quite expensive at runtime.

EDIT:

When using the rabbitTemplate.convertAndSend(...) methods, and you want to set the priority property on the message, you either need to implement a custom MessagePropertiesConverter in the template (subclass the DefaultMessagePropertiesConverter) or use the convertAnSend variants that take a message post-processor; e.g.:

template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setPriority(5);
        return message;
    }
});

Question:

I've got a project where we are going to have hundreds (potentially thousands) of queues in rabbit and each of these queues will need to be consumed by a pool of consumers.

In rabbit (using spring-amqp), you have the rabbitlistener annotation which allows me to statically assign the queues this particular consumer(s) will handle.

My question is - with rabbit and spring, is there a clean way for me to grab a section of queues (lets say queues that start with a-c) and then also listen for any queues that are created while the consumer is running.

Example (at start):

  • ant-queue
  • apple-queue
  • cat-queue

While consumer is running:

  • Add bat-queue

Here is the (very simple) code I currently have:

    @Component
    public class MessageConsumer {

        public MessageConsumer() {
            // ideally grab a section of queues here, initialize a parameter and give to the rabbitlistener annotation
        }

        @RabbitListener(queues= {"ant-queue", "apple-queue", "cat-queue"})
        public void processQueues(String messageAsJson) {
            < how do I update the queues declared in rabbit listener above ? >
        }
    }

Edit:

I should add - I've gone through the spring amqp documentation I found online and I haven't found anything outside of statically (either hardcoded or via properties) declaring the queues


Answer:

  • Inject (@Autowired or otherwise) the RabbitListenerEndpointRegistry.

  • Get a reference to the listener container (use the id attribute on the annotation to give it a known id) (registry.getListenerContainer(id)).

  • Cast the container to an AbstractMessageListenerContainer and call addQueues() or addQueueNames().

Note that is more efficient to use a DirectMessageListenerContainer when adding queues dynamically; with a SimpleMessageListenerContainer the consumer(s) are stopped and restarted. With the direct container, each queue gets its own consumer(s).

See Choosing a container.

Question:

I try to build persistent message queue with some delay per message. In Java-code it's looks like this:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("WorkExchange", "direct");
    channel.queueDeclare("WorkQueue", true, false, false, null);
    channel.queueBind("WorkQueue", "WorkExchange", "");

    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "WorkExchange");

    channel.exchangeDeclare("RetryExchange", "direct");
    channel.queueDeclare("RetryQueue", true, false, false, args);
    channel.queueBind("RetryQueue", "RetryExchange", "");

    channel.confirmSelect();
    BasicProperties properties = new BasicProperties();
    properties.setDeliveryMode(2);
    properties.setExpiration("120000");
    channel.basicPublish("RetryExchange", "", properties, "Hello world!".getBytes());
    channel.waitForConfirmsOrDie();
    connection.close();

However, I have some problem with persistency. When I stop server, wait some time and start it again, messages which have to move to WorkQueue just disappear. What I do wrong? Or it's by design?


Answer:

However, I have some problem with persistency. When I stop server, wait some time and start it again, messages which have to move to WorkQueue just disappear. What I do wrong? Or it's by design?

You should use MessageProperties for making your messages persistence.

channel.basicPublish("", "task_queue", 
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());

Your current code `channel.queueDeclare("RetryQueue", true, false, false, args); will make the queue persistence but not the message.

More here RabbitMQ Doc

Question:

Is it possible to make these settings for each queue? I have queues that are important so i need a larger number of retries, but have less important queues that I do not want to configure retry, attempt, etc

public Queue newQueue(String name) {
    return new Queue(name, durable, exclusive, autoDelete, arguments);
}

I saw that in the Queue class, it is possible to pass an argument map as the last parameter, but I do not know if it would be here, or via properties.


Answer:

Such things are not properties of the queue, they are properties of a retry advice added to the listener container. Use a different container/advice for each queue. See the Spring AMQP reference manual.

Question:

I need some help.

I'm developing a spring boot application, and I want wo publish messages to a rabbitMQ. I want to send it to a queue, that is named in the message itself. This way i want to create queues dynamicly. I only found examples that use a "static" queue.

I have reserched some things but didn't find anything. I'm new to RabbitMQ and learned the basic concepts. I'm also fairly new to spring.

RabbotMQ Config

@Configuration
public class RabbitMQConfig {

    @Value("amq.direct")
    String exchange;

    @Value("queue-name") // Don't want to do this
    String queueName;

    @Value("routing-key") // Or this
    String routingkey;

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchange);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate template(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

MessageSender

@Service
public class RabbitMQSender {

    @Autowired
    private AmqpTemplate template;

    @Value("amq.direct")
    private String exchange;

    public void send(MessageDTO message) {
        template.convertAndSend(exchange, message);

    }
}

Answer:

I came to a solution:

You need to create a AmqpAdmin in your config:

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory);
}

Then you add it to your service:

@Autowired
private AmqpAdmin admin;

Finally you can use it to create queues and bindings.

Queue queue = new Queue(queueName, durable, false, false);
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, EXCHANGE, routingKey, null);
admin.declareQueue(queue);
admin.declareBinding(binding);

I found the solution here

Question:

I am developing an application with rabbitmq support. So, I have a consumer and a producer. And I need to decide between two ways how to set up communication between both of them.

The First Way

public void send(){
   //send to consumer and forget
   rabbitTemplate.convertAndSend("","routing-key",my object);
  //waiting for output queue and messages from consumer
  while(true){
     //receive something.
     if(corellationID==what we need){
        //do what we need
        break;
     }
  }
}

The second way

public void send(){
   //send to consumer and wait for result
   Object o=rabbitTemplate.convertSendAndReceive("","routing-key",my object);

}

Which way will work more quickly and stable under high loadings? And may be there another more effective way to do this. Thank you


Answer:

The second way as with the first way you will have to implement what the second way already does:

  • create a correlation id
  • maintain a map
  • dequeue message from the reply queue
  • correlate reply message with the producer
  • ...

Btw the most effective way is to not have a thread that waits for the reply. and so works in an asynchronous way: the thread that sends the message may not be the one that receive the reply. Have a look at the documentation

Question:

I am working with Java rabbitmq API. This how my app looks like:

Publisher A sends jobs to Q1, then many consumers B do the job and create new job and send it to Q2, where many consumer C do the job.

I want to make sure that no duplicated jobs are sent to Q2, how can I achieve this?

Take in mind two scenarios:

  1. B crash after sending job to Q2 but before sending acknowledgment to Q1 that he completed the job
  2. B crash after sending acknowledgment to Q1 but before sending job to Q2

Answer:

I want to make sure that no duplicated jobs are sent to Q2, how can I achieve this?

you can't. not even on Q1.

the nature of distributed systems and the CAP theorem (https://en.wikipedia.org/wiki/CAP_theorem) says this is impossible, even if the goal is accomplished the vast majority of the time.

in light of that, what you need do is plan for how you will handle the times when a duplicate message is accidentally created.

the most common method is some sort of idempotence (https://en.wikipedia.org/wiki/Idempotence) - a way to guarantee that the same message will only be processed once. or, maybe more accurately, a way to say that the same message can be processed an unlimited number of times, but will only cause change / have an effect on the system once.


in general, though, your situation with multiple queues and consumers that need to process things in order is calling for a "Saga" or "Process Manager" - a long running, asynchronous workflow.

You can read up on this idea of a "Process Manager" in the Enterprise Integration Patterns book, and there are a lot of good libraries around that will implement the details for you.

Question:

On exception while processing the message from RabbitMQ, i just wanted to unacknowledg and put back the particular message to a difference queue instead or re-queue to same queue or completely discard the message(as per the last Boolean flag@requeue in basicNack).

The whole idea is later on i can get the count of unack message and check the message format etc instead of re-queue again and again to same channel and also i want to sent unacknowledged signal to current channel.

FYI I set the channel ack mode as manual(i.e container.setAcknowledgeMode(AcknowledgeMode.MANUAL);)

This is what i am doing now.

public class My***Listener implements ChannelAwareMessageListener{

try{

    @Override
    public void onMessage(Message message,Channel channel) throws Exception {   
    String s = new String(message.getBody());
    //some logic
    //after successful ack manually
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
catch(Exception e){
      //currently on exception i am unack the channel
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}

Any help is highly appreciable.


Answer:

You can send them to a dead letter queue. It's a pretty standard pattern.

https://www.rabbitmq.com/dlx.html

Question:

I have a simple Route defined in a routeContext in Camel (this Route will be used in multiple routes).

    <route id="sendToRabbitQueue">
        <from uri="direct:sendToQueue" />
        <convertBodyTo type="java.lang.String"/>
        <setHeader headerName="rabbitmq.ROUTING_KEY">
            <constant>my.routing.key</constant>
        </setHeader>
        <to uri="ref:genericRabbitEndpoint"/>
    </route>

And I have an endpoint (defined in an endpoints file)

    <endpoint id="genericRabbitEndpoint" uri="rabbitmq://${rabbitmq.host}:${rabbitmq.port}/${rabbitmq.exchange.name}">
        <camel:property key="autoDelete" value="false" />
        <camel:property key="connectionFactory" value="#rabbitConnectionFactory" />
    </endpoint>

Yes - I have seen the http://camel.apache.org/rabbitmq.html page - that's where I got the idea to set the header on the exchange.However no message is being published on the queue. I'm clearly overlooking something and any help would be appreciated.


Answer:

So this seems like a bit of a gotcha and the answer relates to part of the Route I didn't include in the question because I didn't think it was relevant.

The Route starts at a RabbitMq endpoing (not included above). As a result the exchange has the some rabbit headers set when it arrives: rabbitmq.ROUTING_KEY rabbitmq.EXCHANGE_NAME rabbitmq.DELIVERY_TAG

These headers are used across the life of the Route and appear to override the values when I try to publish at a different Rabbit endpoint. The way I've fixed is by introducing a bean which strips the headers out. Not ideal behaviour in my opinion...

public void stripRabbitHeaders(@Headers Map headers)
{
    headers.remove("rabbitmq.ROUTING_KEY");
    headers.remove("rabbitmq.DELIVERY_TAG");
    headers.remove("rabbitmq.EXCHANGE_NAME");
}

Question:

I have a remote RabbitMQ server which has some queues I want to listen to. I tried this:

@RabbitListener(queues = "queueName")
public void receive(String message) {
    System.out.println(message);
}

But it tried to create a new queue. Result is predictable - access denied.

o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queueName

I didn't declare any queue in any other way.

How can I listen to an existing queue on a remote server? Also, is there a way to check if this queue exists? And I saw this line

@RabbitListener(queues = "#{autoDeleteQueue2.name}")

in a tutorial. What does #{queueName.name} mean?

Logs and the beginning of the stack trace:

2018-08-30 22:10:21.968  WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queueName
2018-08-30 22:10:21.991  WARN 12124 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[queueName]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:588) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:996) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

Answer:

Even if you don't have configuration permission on the broker, the queueDeclarePassive used by the listener is allowed (it checks for the presence of the queue).

o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: queueName

That just means that the queue doesn't exist.

@RabbitListener(queues = "#{autoDeleteQueue2.name}")

That is used to get the queue name at runtime (when you have permission to create queues).

e.g.

@Bean
public AnonymousQueue autoDeleteQueue2() {
    return new AnonymousQueue();
}

Spring will add that queue to the broker with a random, unique name. The listener is then configured with the actual queue name.

Question:

I tried to install rabbitmq-priority-queue on my ubuntu 12.04 machine which is running rabbitmq 3.4.0.

according to the instructions in http://www.rabbitmq.com/installing-plugins.html, I've copied the downloaded file rabbitmq_priority_queue-3.4.x-3431dc1e.ez to /usr/lib/rabbitmq/lib/rabbitmq_server-3.4.0/plugins and enabled the plugin via command:

$ sudo rabbitmq-plugins enable rabbitmq_priority_queue

when running the example application in https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java I got the following output:

Sent message with priority 0
Sent message with priority 5
Sent message with priority 10
Received message with priority 0
Received message with priority 5
Received message with priority 10

although I expect the output to be:

Sent message with priority 0
Sent message with priority 5
Sent message with priority 10
Received message with priority 10
Received message with priority 5
Received message with priority 0

which means, consume the highest priority message first.

I've tried to do the following (none worked):

  • restarted rabbit
  • disabled-enabled the plugin
  • deleted the queue (so it would be recreated by the app)
  • changed .ez file permissions to -rw-r--r-- (just like all other plugins)

I've noticed that when I'm enabling the plugin I get the following output:

$ sudo rabbitmq-plugins enable rabbitmq_priority_queue
The following plugins have been enabled:
  rabbitmq_priority_queue

Applying plugin configuration to rabbit@<my laptop name>...WARNING: module rabbit_priority_queue not found, so not scanned for boot steps.
WARNING: module rabbit_priority_queue not found, so not scanned for boot steps.
 started 1 plugin.

But when running 'sudo rabbitmq-plugins list' I get:

$ sudo rabbitmq-plugins list
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@benny-laptop
 |/
[e*] amqp_client                       3.4.0
[  ] cowboy                            0.5.0-rmq3.4.0-git4b93c2d
[  ] eldap                             3.4.0-gite309de4
[e*] mochiweb                          2.7.0-rmq3.4.0-git680dba8
[  ] rabbitmq_amqp1_0                  3.4.0
[  ] rabbitmq_auth_backend_ldap        3.4.0
[  ] rabbitmq_auth_mechanism_ssl       3.4.0
[  ] rabbitmq_consistent_hash_exchange 3.4.0
[  ] rabbitmq_federation               3.4.0
[  ] rabbitmq_federation_management    3.4.0
[E*] rabbitmq_management               3.4.0
[e*] rabbitmq_management_agent         3.4.0
[  ] rabbitmq_management_visualiser    3.4.0
[  ] rabbitmq_mqtt                     3.4.0
[E*] rabbitmq_priority_queue           3.4.x-3431dc1e
[  ] rabbitmq_shovel                   3.4.0
[  ] rabbitmq_shovel_management        3.4.0
[  ] rabbitmq_stomp                    3.4.0
[  ] rabbitmq_test                     3.4.0
[  ] rabbitmq_tracing                  3.4.0
[e*] rabbitmq_web_dispatch             3.4.0
[  ] rabbitmq_web_stomp                3.4.0
[  ] rabbitmq_web_stomp_examples       3.4.0
[  ] sockjs                            0.3.4-rmq3.4.0-git3132eb9
[e*] webmachine                        1.10.3-rmq3.4.0-gite9359c7

According to the result [E*] it looks like the plugin is enabled. I'm not sure if the plugin was enabled properly, or maybe I don't understand it correctly. Please advise.


Answer:

Ah. Yes. That "module not found" error message is misleading. The binary version of the priority queue plugin that's available on the website is compiled with Erlang R15B - but the version of Erlang that ships with your Ubuntu is R14B04. So I think that's the root of the problem.

You could upgrade Erlang (Erlang Solutions offer backports of R16B for 12.04 from here) or compile the plugin yourself (not for the faint hearted).

Meanwhile I'll look at getting the error message to say something more intelligent.

Question:

This is probably some silly mistake I'm missing, but here is the issue:

I am trying to insert a simple "hello" message into a Rabbit queue, with a predefined exchange and routing key. This is the code that I am using:

    private static void send_equity_task_to_rabbitmq(ConnectionFactory factory) throws IOException,TimeoutException{

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("b", false, false, false, null);
        channel.exchangeDeclare("b", "direct");

        channel.basicPublish("b","b",null, "hello".getBytes());

        channel.close();
        connection.close();
    }

public static void main(String[] argv) throws TimeoutException,IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");

    Date start_time= Calendar.getInstance().getTime();
    Long start_time_timestamp=System.currentTimeMillis();

    System.out.println("[INFO] Starting connection to queue at:"+start_time);
        send_equity_task_to_rabbitmq(factory);

        Long end_time_timestamp=System.currentTimeMillis();

        System.out.println("[INFO] Message sent and processed successfully after:"+ (end_time_timestamp-start_time_timestamp)+" miliseconds");

 }
}

The code runs without any error. However, when I check the amount of records inside the "b" queue, I get:

$ rabbitmqctl list_queues
Listing queues ...
b       0
...done.

I don't have consumers for this queue at the moment, so I assume since it has 0 records, that I am using basicPublish badly. What could be wrong?

Thank you.


Answer:

I think you need to bind the queue to the exchange. You've created a queue called "b" and an exchange called "b". The exchange will distribute messages to queues that are bound to it, using the "b" routingKey, but as the "b" queue isn't bound to the "b" exchange, the "b" exchange doesn't publish to that queue.

Question:

What is the behavior when using the following construct (latest version of Spring). I'm unable to find it in the documentation.

@RabbitListener(queues = {"q1", "q2", "q3"})
public class MyListener {

In which order are the messages from the 3 queues processed?


Answer:

It is indeterminate - 3 basicConsume operations are performed on the consumer channel (if you increase concurrentConsumers it's 3 per consumer). The basicConsume operations are normally performed in the order the queues are defined (in all cases unless one or more of the queues is temporarily "missing").

The broker will send messages from each queue up to the prefetchCount (basicQos) for each queue (default 1).

I don't know the actual algorithm used by the broker in this scenario but you should assume it to be indeterminate - Spring AMQP will deliver them to the listener(s) in the order received from the broker.

EDIT

I just ran a test (2 queues each with 2 existing messages) and they were delivered round-robin - q1m1, q2m1, q1m2, q2m2 when the prefetch was 1.

With prefetch set to 4, I see q1m1, q1m2, q2m1, q2m2.

Of course, when the queues are empty, messages will generally arrive in the order they arrive at the broker.

EDIT2

See Consumer Prefetch.

Spring AMQP uses the basicQos variant with no global arg, so the default (false) is used. That means the prefetch is per-consumer.

Question:

Given a queue that has messages, how do I use Spring AMQP get all the messages stored in that queue? Note, the question to not asking how to listen to a queue.


Answer:

Sorry, I don't see any sense in such a solution. It is really better to listen to the queue for all messages. If you need something like browse - get and requeue (or nack), then yes AmqpTemplate.receive() is good choice. You should wrap that invocation to the TX (e.g. just with RabbitTransactionManager), do while(true) until receive() returns messages and TransactionAspectSupport.currentTransactionStatus().setRollbackOnly() in the end.

Question:

I am using a SimpleMessageListenerContainer and had problems that every hour or so the queue would get stuck and nothing would be processed due to an unack'd message.

I am sure this is due an error that isn't being caught properly but can't trace the issue.

I have set the acknowledge mode to NONE and this "fixed" the issue but it is really just hiding the issue. Also if I want to throw a AmqpException and re-queue the message this doesn't work with acknowledge mode set to NONE.

My question is how can I trace the issue with the queue getting stuck, is there a way to see the payload of the unack'd message? Or is there an acknowledgement mode that will allow acknowledges to not to be needed but re-queue messages if an exception is thrown?

Here is how I am registering a listener:

final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(new MQMessageListenerWrapper(listener));
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.start();

Thanks.


Answer:

My best guess is your consumer thread is hung someplace upstream of the listener. When control is returned to the container, the message is ack'd or rejected; it can't be left in an unack'd state if the thread returned to the container.

Use jstack <pid> to find out where the consumer thread is stuck.

You are correct NONE is just masking the issue.

Question:

I have multiple modules which communicate each other by mean of a message queue (Spring Rabbit). Some modules produce messages and others consume them. However, a single module can listen different queues, I have a list of queue names in a list, so I created a SimpleMessageListenerContainer for each queue name as follow.

public void build() {
    for (String queueName: queues) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueueNames(queueName);
        listenerContainer.setMessageListener(listenerAdapter());
    }
}

@Bean
private MessageListenerAdapter listenerAdapter() {
    return new MessageListenerAdapter(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            System.out.println(message.getBody());
        }
    }, "onMessage");
}

This implementation is not working for me, consumer are not registered in the queue and any error or exception is throwing during execution.

Note: I am using Spring and I am limited to not use annotations such as @RabbitListener


Answer:

When you declare SimpleMessageListenerContainer manually, not as beans, you have also to ensure application context callbacks and lifecycle:

listenerContainer.setApplicationContext()
listenerContainer.setApplicationEventPublisher()
listenerContainer.afterPropertiesSet()
listenerContainer.start()

And don't forget to stop() and destroy() them in the end of application.

Question:

Is it possible to bind multiple topics from an exchange to single queue? For example I have producer which declares an exchange and sends three topics log.verbose, log.info, log.error

And I would like to have two consumers which declare own queue and bind to exchange. First consumer subscribes first two topics log.verbose and log.info and another one subscribes all topics.

Is it possible to define config for consumer 1 something like this?

spring.cloud.stream.bindings.input.destination=log
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=log.info,log.verbose


Answer:

You can not currently do it through property configuration, but you can add the second binding yourself, either manually on the management UI, or by adding Exchange, Queue and Binding @Beans to the application. You can't simply add the second @Binding bean since the queue won't exist yet when the RabbitAdmin (when the application is first started and no queue exists).

See Configuring the broker in the Spring AMQP reference manual.

This technique won't work for anonymous consumers (no ...group property specified for the input) since the queue name is not known.

Question:

I'm having difficulty finding a Spring way to initial an exchange that's sending the incoming message to more then 1 queue - on my Spring-boot application:

I can't find a good way to define a seconds exchange-queue binding.

I'm using RabbitTemplate as the producer client.

The RabbitMQ 6 page tutorial doesn't really help with that since:

  1. the only initial several temporary queues from the Consumer on-demand (while I need to the Producer to do the binding - to persistant queues)
  2. The examples are for basic java usage - not using Spring capabilities.

I also failed to find how to implement it via The spring AMQP pages.

what I got so far, is trying to inject the basic java binding to the spring way of doing it - but it's not working....

@Bean
public ConnectionFactory connectionFactory() throws IOException {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");

    Connection conn = connectionFactory.createConnection();
    Channel channel = conn.createChannel(false);

    channel.exchangeDeclare(SPRING_BOOT_EXCHANGE, "fanout");
    channel.queueBind(queueName, SPRING_BOOT_EXCHANGE, ""); //first bind
    channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");// second bind

    return connectionFactory;
}

Any help would be appreciated

Edited

I think the problem arise with the fact that every time I restart my server it tries to redefine the exchange-query-binding - while they persist in the broker... I managed to define them manually via the brokers UI console - so the Producer only aware of the exchange name, and the Consumer only aware to it's relevant queue. Is there a way to define those element progrematically - but in such a way so it won't be redefined\overwritten if already exist from previous restarts?


Answer:

We use an approach similar to the following to send data from one specific input channel to several input queues of other consumers:

@Bean
public IntegrationFlow integrationFlow(final RabbitTemplate rabbitTemplate, final AmqpHeaderMapper amqpHeaderMapper) {
    IntegrationFlows
        .from("some-input-channel")
        .handle(Amqp.outboundAdapter(rabbitTemplate)
        .headerMapper(headerMapper))
        .get()    
}

@Bean
public AmqpHeaderMapper amqpHeaderMapper() {
    final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
    headerMapper.setRequestHeaderNames("*");
    return headerMapper;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
   return new CachingConnectionFactory();
}

@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory rabbitConnectionFactory) {
    final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory rabbitConnectionFactory, final RabbitAdmin rabbitAdmin) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    final FanoutExchange fanoutExchange = new FanoutExchange(MY_FANOUT.getFanoutName());
    fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
    for (final String queueName : MY_FANOUT.getQueueNames) {
        final Queue queue = new Queue(queueName, true);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);

        final Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
    }
    rabbitTemplate.setExchange(fanoutExchange);    
}

and for completeness here's the enum for the fanout declaration:

public enum MyFanout {
    MY_FANOUT(Lists.newArrayList("queue1", "queue2"), "my-fanout"),

    private final List<String> queueNames;
    private final String fanoutName;

    MyFanout(final List<String> queueNames, final String fanoutName) {
        this.queueNames = requireNonNull(queueNames, "queue must not be null!");
        this.fanoutName = requireNonNull(fanoutName, "exchange must not be null!");
    }

    public List<String> getQueueNames() {
        return this.queueNames;
    }

    public String getFanoutName() {
        return this.fanoutName;
    }
}

Hope it helps!

Question:

Hi I am using rabbitMQ in my java application. When I stop the application I need to make sure stop the queue listener (stop receiving messages from the queue) before stop the application. I am not sure whether I need to call channel.close() or channel.basicCancel("tag"). Following code was written to stop the queue listening.

if(myContext.myChannel.isOpen()){ 

  //myChannel is the one I am using to listen to queue 
  myContext.myChannel.basicCancel("OP"); 

  //myContext.myChannel.close(); 
}else{ 
   return ok("Channel is not open"); 
}

Answer:

If you want stop listener you have to:

 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare("myQueue", false, false, false, null);
 MyConsumer consumer = new MyConsumer(channel);
 String consumerTag = channel.basicConsume("myQueue", false, consumer);
 System.out.println("press any key to terminate");
 System.in.read();

 channel.basicCancel(consumerTag);<----- this stop listening
 channel.close(); <--- this close the channel and eventually all listening
 connection.close();<-- this close the connection and all channels 

Please read this about "isOpen" method: https://www.rabbitmq.com/api-guide.html

Use of the isOpen() method of channel and connection objects is not recommended for production code, because the value returned by the method is dependent on the existence of the shutdown cause.

Question:

Is there a way to tell spring bus to rename its rabbitmq queues? On startup they seem to be just some random values like so:

springCloudBus.anonymous.4zzIP0z-TH6oIza5mCun7Q

trying to get spring bus to rename this to a more human readable predictable queue name. For example:

testQueue

or something with knowledge to what service it's holding messages for.

Ive tried adding the following to the application.yml on for bootRun:

spring:
   cloud: 
     stream:
       bindings:
         output:
           destination: testQueue

with no avail. Please help!!


Answer:

NOTE: anonymous groups are essential for Spring Cloud Bus to work properly.

using a group makes

a) the subscription durable which means that apps will receive all events (including the ones that have been sent while they were not running)

b) using groups means that apps can become competing consumers which means that the events are not broadcast

c) queues are not deleted automatically anymore

The destination you set in spring-cloud-bus inbound/outbound channels are the rabbitmq exchanges not the queues.

For spring-cloud-bus the outbound channel name is springCloudBusOutput.

Hence, your configuration needs to be: spring: cloud: stream: bindings: springCloudBusOutput: destination: testExchange Here the destination name testExchange is the exchange name not the queue name. To avoid anonymous name in the queue, you can set a group name for inbound channel binding.

spring: cloud: stream: bindings: springCloudBusInput: destination: testExchange group: testQueue

This will make the queue name testExchange.testQueue

Question:

I currently have a FooListener that listens to a queue containing Foo messages. How do I add another BarListener class to listen to the same queue for Bar messages?

My RabbitMQ is currently configured like this:

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(workQueue());
        container.setMessageListener(new MessageListenerAdapter(fooListener(), new JsonMessageConverter()));
        container.setDefaultRequeueRejected(false);
        return container;
    }
}

Answer:

There is currently no in-built support to route to different listeners according the payload type.

You can write a simple listener wrapper...

public void handleMessage(Object payload) {
    if (payload instanceof Foo) {
        this.fooListener.handleMessage((Foo) payload);
    }
    else if (payload instanceof Bar) {
        this.barListener.handleMessage((Bar) payload);
    }
    else {
        // unexpected payload type
    }
}

EDIT:

Spring AMQP 1.5 (currently at milestone 1 - 1.5.0.M1) now supports this feature; see what's new and blog announcement.

Question:

I have recently started learning Spring and spring-amqp so this question might appear very basic so please excuse me for that.

I have multiple queues which are on different hosts and have different QueueName, RoutingKey, vhost, user, password. I am writing the publishing logic for these queues and was not able to decide if I should have one configuration class per queue or can it be done in XML.

The method of creating a class to have all the information about the queue (host, vhost, username etc) is working fine as described in this example. I created a @Configuration class and defined all the beans for that Queue. But then I need to do

ApplicationContext context = new AnnotationConfigApplicationContext(MyQueueConfiguration.class);
AmqpTemplate amqpTemplate  = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World!!");

So my requirement is:

  1. Since I have many queues that need to be instantiated at the application startup, once tomcat starts the connections/channels to the queue/rabbit cluster should be established.
  2. Then as soon as there is POST request that comes to my application I need to publish the message to one of the queue based on a POST parameter.

So for each queue do I always need to do:

ApplicationContext context = new AnnotationConfigApplicationContext(HelloWorldConfiguration.class);

Or is there a way to Spring load the all my Queue configuration classes and just use the object like:

    // calling code when I get a POST request
MyQueueConfigurationClass.publishMessage(payload, queueName);

// The implementation code
public boolean publishMessage(String payload, String queueName){

    // Get Bean for the **queueName** somehow
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    // Use the bean to send the message
    amqpTemplate.convertAndSend(payload);

}
  1. So how can I get the amqpTemplate for the exact queue without doing new AnnotationConfigApplicationContext() everytime?
  2. What is the harm of doing new AnnotationConfigApplicationContext every time a request comes to my service? [I am guessing that creating a new object for each request is not a good idea]

Answer:

You should not create a new context each time; that is very wasteful.

You can add multiple connection factories (one for each rabbit host) to the root (or web) context, and then use a Routing Connection Factory together with a sendConnectionFactorySelectorExpression to select the proper host based on the message you are sending.

Or, you can simply wire up a different RabbitTemplate for each server.

EDIT:

To use the SimpleRoutingConnectionFactory, do something like...

try {
    SimpleResourceHolder.bind(routingCF, keyForThisMessage);
    rabbitTemplate.convertAndSend(message);
}
finally {
    SimpleResourceHolder.unbind(routingCF);
}

(this will work with an unmodified RabbitTemplate) or...

<rabbit:template id="routingTemplate"
    connection-factory="rcf"
    send-connection-factory-selector-expression="messageProperties.headers['cfKey']" />

<bean id="rcf" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="foo" value-ref="cf1"/>
            <entry key="bar" value-ref="cf2"/>
        </map>
    </property>
    <property name="defaultTargetConnectionFactory" ref="defaultCF"/>
</bean>

...and then...

this.routingTemplate.convertAndSend("exchange", "routingKey", "xyz", new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setHeader("cfKey", "foo");
        return message;
    }

});

There's a complete test case here.

Question:

I want to integratate Camel route with RabbitMQ via xml- configuration.

I need to listen to messages from MYPRETTYQ, that already exists in Rabbiit.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">


<camelContext id="camelId" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="rabbitmq://localhost:5672/direct?queue=MYPRETTYQ&autoDelete=false&skipQueueDeclare=true"/>
        <log message="File: ${body}"/>
    </route>
</camelContext>

Params were taken from https://camel.apache.org/rabbitmq.html

However the '&' symbol is not parsed in uri string and I get the following exception:

 Caused by: org.xml.sax.SAXParseException; lineNumber: 27; columnNumber: 91; The reference to entity "autoDelete" must end with the ';' delimiter.

I tried changing '&' to ';' however it causes invalid behavior. Instead of beeing parsed to params the line

queue=MYPRETTYQ;autoDelete=false;skipQueueDeclare=true 

creates a new queue with such a name.

I'm at a loss, because all URI examples show that using '&' is the correct way of passing params. Any help appreciated


Answer:

 you should replace & to &amp;

Question:

I'm trying to make a Java service using Spring Boot that connects to a Rabbit exchange, discover new queues (that matches with a given prefix) and connect to them. I'm using RabbitManagementTemplate to discover and SimpleMessageListenerContainer to create a bind. It works fine.

The problem is that when one of these dynamic queues gets deleted (by the web interface for example), my service can't handle the exception and I didn't find a place to register some handler to fix this. For these cases I just want to ignore the deletion and move on, I'm not willing to recreate the queue.

My code is something like

@Scheduled(fixedDelay = 3*1000)
public void watchNewQueues() {
    for (Queue queue : rabbitManagementTemplate.getQueues()) {
        final String queueName = queue.getName();
        String[] nameParts = queueName.split("\\.");
        if ("dynamic-queue".equals(nameParts[0]) && !context.containsBean(queueName)) {

            logger.info("New queue discovered! Binding to {}", queueName);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with("testroute.#");
            rabbitAdmin.declareBinding(binding);
            rabbitAdmin.declareQueue(queue);

            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueName);
            container.setMessageListener(new MyMessageListener());
            container.setPrefetchCount(settings.getPrefetch());

            container.setAutoDeclare(false);
            container.setMissingQueuesFatal(true);
            container.setDeclarationRetries(0);
            container.setFailedDeclarationRetryInterval(-1);

            context.getBeanFactory().registerSingleton(queueName, container);

            container.start();
        }
    }
}

@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
    if (event.getSource() instanceof SimpleMessageListenerContainer) {
        SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
        if (context.getAutowireCapableBeanFactory() instanceof BeanDefinitionRegistry) {
            logger.info("Removing bean! {}", container.getQueueNames()[0]);
            ((BeanDefinitionRegistry)context.getAutowireCapableBeanFactory()).removeBeanDefinition(container.getQueueNames()[0]);
        } else {
            logger.info("Context is not able to remove bean");
        }
    } else {
        logger.info("Got event but is not a SimpleMessageListenerContainer {}", event.toString());
    }
}

And when the queue gets deleted, console logs:

2018-03-13 15:01:29.623  WARN 32736 [pool-1-thread-6] --- o.s.a.r.listener.BlockingQueueConsumer   : Cancel received for amq.ctag-wKQUQkUNOSCtjQ9RBUNCig; Consumer: tags=[{amq.ctag-wKQUQkUNOSCtjQ9RBUNCig=dynamic-queue.some-test}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,3), conn: Proxy@23510c77 Shared Rabbit Connection: SimpleConnection@66c17803 [delegate=amqp://guest@localhost:5672/], acknowledgeMode=AUTO local queue size=0
2018-03-13 15:01:30.219  WARN 32736 [SimpleAsyncTaskExecutor-1] --- o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException
2018-03-13 15:01:30.219  INFO 32736 [SimpleAsyncTaskExecutor-1] --- o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,3), conn: Proxy@23510c77 Shared Rabbit Connection: SimpleConnection@66c17803 [delegate=amqp://guest@localhost:5672/], acknowledgeMode=AUTO local queue size=0
2018-03-13 15:01:30.243  WARN 32736 [SimpleAsyncTaskExecutor-2] --- o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue:dynamic-queue.some-test
2018-03-13 15:01:30.246  WARN 32736 [SimpleAsyncTaskExecutor-2] --- o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[dynamic-queue.some-test]
  at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
  at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: null
  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
  at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
  at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
  at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:835)
  at com.sun.proxy.$Proxy63.queueDeclarePassive(Unknown Source)
  at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
  ... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'dynamic-queue.some-test' in vhost '/', class-id=50, method-id=10)
  at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
  at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
  ... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'dynamic-queue.some-test' in vhost '/', class-id=50, method-id=10)
  at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
  at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
  at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
  at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
  at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
  ... 1 common frames omitted

Thanks for your attention

EDIT: Thanks! I was able to avoid the recreation of the queue. I'm now struggling to remove the queue from the Spring Context :)


Answer:

You'll get error logs, of course, but with container.setMissingQueuesFatal(true); (the default), the container will stop itself after 3 attempts to declare the queue at 5 second intervals.

You can affect the time it takes to stop by setting the declarationRetries (default 3) and failedDeclarationRetryInterval (default 5000).

Question:

Is it possible to change the routing key of an existing queue in RabbitMQ using a Java client?


Answer:

a queue does not have a routing key. a queue is just a place where messages sit.

the routing key lives in the binding between an exchange and a queue

you need to create a new binding between your exchange and queue, and delete the old binding if you no longer need it

Question:

My application uses Spring + RabbitMQ. It has already designed two features displaying user & his friend's posts on the home page & the notifications feature for any event which has happened.

For these two features I have predefined queues in the rabbitmq configuration bound to the exchanges. The underlying pattern is publish subscribe.

Now I am confused about the design of the third feature. Say a user creates a a topic say "Halloween" & n users subscribe to it. Similarly n users will create their n topics & other users will subscribe to it for updates. This too is a pubsub pattern.

I believe for every individual topic a new queue should be created. So how do I dynamically create a queue for every topic created by the user in the application? Or there is some other way to deal with this?

Below is the existing queue configuration of the app.

<!-- Creates a queue for consumers to retrieve messages -->
<rabbit:queue name="UserPostpublishQueue" durable="true"/>

<!-- queue for sending notifications to users -->
<rabbit:queue name="notificationQueue" durable="true"/>

<!-- Fanout exchange for a pubsub bound to UserPostpublishQueue -->
<fanout-exchange name="broadcastPosts" durable="true" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="UserPostpublishQueue"/>
    </bindings>
</fanout-exchange>

<!-- Direct exchange for a broadcasting notifications -->
<rabbit:direct-exchange name="broadcastNotifications" durable="true" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="notificationQueue" key="notifications"/>
    </bindings>
</rabbit:direct-exchange>

Answer:

Say a user creates a a topic say "Halloween" & n users subscribe to it [...] This too is a pubsub pattern.

while this is "publishing" of content, with other people "subscribing" to that content, this is not the pub-sub pattern.

the pub-sub pattern is explicitly about "toss it over the fence, who cares who is listening, if anyone". the pub-sub pattern is just a fancy term for what are typically events. It's the equivalent of someone saying "Hey! [thing] happened!" and other people responding in some way, if they feel like responding. if a specific person is not there to hear that the thing happened, then too bad. they don't get notice of that thing happening. it's like being out with friends. if one of your friends is not there, then they don't get to "be there" later, when they decide to. they have already missed the opportunity.

in your situation, you are describing a newspaper or print magazine. content is being published for other people to consume. a subscriber expects the articles and reports to be delivered to them at some point in the future. if they do not receive the info they were promised from the magazine or newspaper, they will be upset. they don't have to 'be there' in person when the events are happening. they are getting a report after things happen, and are guaranteed (to a degree) to receive the report.

I believe for every individual topic a new queue should be created.

this is a very bad idea. you will very quickly run into severe performance problems with the size and complexity of your RabbitMQ configuration and instances. You'll end up with thousands upon thousands of queues that are rarely used, if they are used more than once.

worse, though, is that you will likely end up treating the queues as if they were a database. once you realize that you can't query a queue, or read the same message multiple times, things fall apart fast.

So how do I dynamically create a queue for every topic created by the user in the application?

the short answer is: don't.

Or there is some other way to deal with this?

a message queue is a great way to push data between processes. use messaging services for that.

what you want is database design, where you keep track of who is subscribed to what content, etc.

all of this is following along the lines of things i have written:

Question:

I have a java spring app and rabbitmq server. I have two queues. The first one check that A is in DB. The second saves A to db. I mean listeners. But, if I send many requests with the same A, the first queue listener is trying to check A in DB, even before the second listener saves it to the DB. How to sync two queues? So the first listener should know that the second has already saved this A to DB and then the first should check it in DB. So, if I send 1000 requests of A, the first listener checks A, there is no A, then sends to the second listener, it saves A, but on the second iteration the first checks A again, but the second listener may not complete it's job. And the first should wait then. How to solve it?


Answer:

use threads, maybe with locks. First thread deques element from Queue A, and passes it to other thread, which locks your DB, checks if A is in DB, if not - writes it to DB, unlocks DB, and accepts next passed element. Same with Queue B. Just lock the DB before checking and writing to it, and unlock after writing to it.

Question:

I need to send a message to a queue with name "myQueue" to the MQ server. Should I declare all required beans in spring, like Queue and etc for it. It seems to me useless because I only need to send to server, and receive if possible, I`m not going to listen the queue, is it possible just to throw it there and forget, without any explicit configuration?


Answer:

I need to send a message to a queue with name "myQueue" to the MQ server. 
Should I declare all required beans in spring, like Queue and etc for it.

No. With out declaring beans like connection factory, exchange and queue you can not publish messages to queue.

It seems to me useless because I only need to send to server, and receive if possible, 
I`m not going to listen the queue, is it possible just to throw it there and forget, without any explicit configuration?

Spring is not super natural, though you consume your message or not if you want to push your message to queue spring should know the connection and queue details.

You can refer my blog post for more details on spring-rabbit programming.

Question:

We have a Spring Boot (2.1) application using Apache Camel (2.24) to consume from a RabbitMQ server (3.7.15).

The application appears to be consuming correctly (message-by-message, as they are placed on the queue), but in the RabbitMQ monitor it appears as those the messages are consumed 'in bulk' (see the sharp drop then flatline, even though we see in the logs that messages are being processed by the app).

We haven't set any sort of 'prefetch' behaviour that I can see. Can someone explain what's happening? Why isn't the queue count decreasing smoothly?


Answer:

Well, it simply looks like the default prefetch value is unlimited. If you want to limit it, you have to explicitly configure it.

I didn't found an official source confirming this impression, but at least an article that does: https://www.cloudamqp.com/blog/2017-12-29-part1-rabbitmq-best-practice.html#prefetch

RabbitMQ default prefetch setting gives clients an unlimited buffer, meaning that RabbitMQ by default send as many messages as it can to any consumer that looks ready to accept them.

The Camel component has an option prefetchEnabled that is by default false. However, when I look at the RabbitConsumer class of the Camel component in the method openChannel, this just means that the consumer does not explicitly set prefetch values.

A consumer without prefetch settings is not necessarily a consumer with prefetch disabled, it is a consumer that does not care about prefetch (and therefore gets a default that is defined somewhere else).

If I have not overlooked something the Camel option prefetchEnabled has not a good name. It should be called limitPrefetch. That would also match with the RabbitMQ docs:

... specifies the basic.qos method to make it possible to limit the number of unacknowledged messages on a channel (or connection)

Conclusion: I suspect that if you want a prefetch limit with the Camel component you have to set prefetchEnabled as well as the other prefetch options. Otherwise there is no limit (what basically makes sense since this gives you maximum throughput).

Question:

I'm interested in finding the location of a message in a FIFO queue. The goal is to be able to display to a user "You are 3rd in line", then "You are 2nd in line", then "Your request is being processed". I have access to AWS SQS and RabbitMQ. I know I can get the total number of messages in the queue, I just don't know where the message I care about is located. So far I have read docs on the queuing technologies, but haven't seen a solution, so no code has been written yet. Has anyone else been able to successfully find the index/location of a single message?


Answer:

It isn't possible to get an item's "index".

What you can do though, is get an estimate of wait time (interestingly enough, this concept is very similar to how Disneyland measures wait time in their lines):

  1. Measure the following datapoints (and know when they change):
    • parallelism - How many items are processed in parallel?
    • processingTime - How long does it take an item to be processed?
    • queueLength - How many items are in the queue?
  2. Calculate estimate:
    • waitTimeOfNextItem = queueLength * processingTime / parallelism

For example, if you have 100 items in the queue and each will take around 10 seconds to process, and you process 4 at once, then it'll take 100*10/4 or 250 seconds to process all the items. If a 101st item were to be put into the queue, you could estimate it would start processing after 250 seconds.

If your processingTime varies a lot, you can do calculations like rolling average or sampling, or provide a time range (5-10 minutes), depending on how accurate you want to be.

While you can also randomly send through "time card" items to refine this further (essentially a fake item that purely has the purpose of calculating how long it takes to get through the system), you can just send this info with every item instead.

Question:

I would like to know if there is any alternative to create / change / remove exchanges, queues and bindings without depending of the framework (in my case, Spring) for this and his limitations.

The problem

Often I need to change the name of a Routing Key, Queue, or Exchange, and these frameworks do not allow this kind of more "refined" changes. As a consequence, the tendency is for the original names of queues/keys and even the original setup (durable, DLQ, etc) remains. On the future, this ends up confusing the organization of the queues because you can not easily give proper maintenance to their name, configuration, eventually reorganize them at different exchanges, etc.

Actually, the only way to accomplish this is manually removing them at each environment and let the framework recreate them.

I would like to know if there are any alternative to control this, something like the tools to database migration, like Liquibase, Flyway, etc.

Doing a parallel situation with the database, currently letting the Spring create everything in RabbitMQ seems to me analogous to leaving hbm2ddl Hibernate option on update on a Production database.


Answer:

You can change some things but not others - but you have to do it programmatically, not declaratively.

You can use RabbitAdmin.declareBinding() to bind a queue with a different routing key (and/or exchange), and then use removeBinding() to remove the old one.

You cannot change queue arguments (DLQ settings etc) or durability.

You can use the shovel plugin to move messages from an old queue to a new one.

Question:

I am trying to create a fanout exchange where everyone will receive messages from a publisher. My issue here is that messages that are published in the queue are not able to be picked up by the listener. The queues that are setup are all anonymous queues that dies with the application instance. Publisher & Subscribers are in the same application. Any help is much appreciated.

Queue Config:

@Value("${apcp.rabbitmq.refresh-exchange}")
private String fanoutExchangeName;

@Autowired
Queue anonQueue; 
@Bean("amqp-admin")
@PostConstruct
public AmqpAdmin AMQPAdmin(){
    log.info(connectionFactory.toString());
    AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
    return amqpAdmin;
}
@Bean
@PostConstruct
public String initRefreshAmqp(){
    setupFanOutExchange();
    return "";
}
public void setupFanOutExchange(){
    AmqpAdmin amqpAdmin =  new RabbitAdmin(connectionFactory);
    FanoutExchange exchange = new FanoutExchange(fanoutExchangeName);
    amqpAdmin.declareExchange(exchange);
    Queue queue = new Queue(anonQueue, false, true, true);
    amqpAdmin.declareQueue(queue);
    amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
}

Publisher

@RequestMapping(value = "/publish")
public String publish(String message){
    rabbitTemplate.convertAndSend(exchangeName, message);
    return "";
}

Subscriber-Config

@Bean
@PostConstruct
public SimpleRabbitListenerContainerFactory listenerFactory() {
    log.info("CONNECTIONS:"+connectionFactory.toString());
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jsonMessageConverter());
    return factory;
}

Subscriber Listener

@RabbitListener(queues = "#{anonQueue.name}", containerFactory = "listenerFactory")
public void receiverQueue(String message){
    log.info(message);
}

Answer:

1) There is no such method:

rabbitTemplate.convertAndSend(exchangeName, message);

The two-arg method is

public void convertAndSend(String routingKey, final Object object) throws AmqpException {

So the broker is dropping your message.

2) You must not invoke admin methods (or do anything that involves the broker) in bean definitions

3) Your configuration is much more complicated than needed.

This works fine...

@SpringBootApplication
public class So49854747Application {

    public static void main(String[] args) {
        SpringApplication.run(So49854747Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, FanoutExchange exchange) {
        return args -> {
            template.convertAndSend(exchange.getName(), "", "foo");
            Thread.sleep(10_000);
        };
    }

    @Bean
    public Queue anonQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange("so49854747");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(anonQueue()).to(exchange());
    }

    @RabbitListener(queues = "#{anonQueue.name}")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

2018-04-16 09:01:54.620  INFO 50389 --- [           main] com.example.So49854747Application        : Started So49854747Application in 1.407 seconds (JVM running for 1.909)
foo

Question:

I would like to achieve following scenario in my application:

  1. If a business error occurs, the message should be send from the incomingQueue to the deadLetter Queue and delayed there for 10 seconds
  2. The step number 1 should be repeated 3 times
  3. The message should be published to the parkingLot Queue

I am able (see the code below) to delay the message for a certain amount of time in a deadLetter Queue. And the message is looped infinitely between the incoming Queue and the deadLetter Queue. So far so good.

The main question: How can I intercept the process and manually route the message (as described in the step 3) to the parkingLot Queue for later further analysis?

A secondary question: Can I achieve the same process with only one exchange?

Here is a shortened version of my two classes:

Configuration class

@Configuration
public class MailRabbitMQConfig {

    @Bean
    TopicExchange incomingExchange() {
       TopicExchange incomingExchange = new TopicExchange(incomingExchangeName);
        return incomingExchange;
    }

    @Bean
    TopicExchange dlExchange() {
        TopicExchange dlExchange = new TopicExchange(deadLetterExchangeName);
        return dlExchange;
    }

    @Bean
    Queue incomingQueue() {

        return QueueBuilder.durable(incomingQueueName)
                .withArgument(
                        "x-dead-letter-exchange",
                        dlExchange().getName()
                )
                .build();
    }

    @Bean
    public Queue parkingLotQueue() {
        return new Queue(parkingLotQueueName);
    }

    @Bean
    Binding incomingBinding() {
        return BindingBuilder
                .bind(incomingQueue())
                .to(incomingExchange())
                .with("#");
    }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder
                .durable(deadLetterQueueName)
                .withArgument("x-message-ttl", 10000)
                .withArgument("x-dead-letter-exchange", incomingExchange()
                    .getName())
                .build();
    }

    @Bean
    Binding dlBinding() {
        return BindingBuilder
                .bind(dlQueue())
                .to(dlExchange())
                .with("#");
    }

    @Bean
    public Binding bindParkingLot(
            Queue parkingLotQueue,
            TopicExchange dlExchange
    ) {

        return BindingBuilder.bind(parkingLotQueue)
                    .to(dlExchange)
                    .with(parkingLotRoutingKeyName);
    }
}

Consumer class

@Component
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
    public Boolean receivedMessage(MailDataExternalTemplate mailDataExternalTemplate) throws Exception {

        try {
            // business logic here
        } catch (Exception e) {
            throw new AmqpRejectAndDontRequeueException("Failed to handle a business logic");
        }

        return Boolean.TRUE;
    }
}

I know I could define an additional listener for a deadLetter Queue in a Consumer class like that:

@RabbitListener(queues = "${mail.rabbitmq.queue.deadletter}")
public void receivedMessageFromDlq(Message failedMessage) throws Exception {
    // Logic to count x-retries header property value and send a failed message manually
    // to the parkingLot Queue
}

However it does not work as expected because this listener is called as soon as the message arrives the head of the deadLetter Queue without to be delayed.

Thank you in advance.


EDIT: I was able with @ArtemBilan and @GaryRussell help to solve the problem. The main solution hints are within their comments in the accepted answer. Thank you guys for the help! Below you will find a new diagram that shows the messaging process and the Configuration and the Consumer classes. The main changes were:

  • The definition of the routes between the incoming exchange -> incoming queue and the dead letter exchange -> dead letter queue in the MailRabbitMQConfig class.
  • The loop handling with the manual publishing of the message to the parking lot queue in the Consumer class

Configuration class

@Configuration
public class MailRabbitMQConfig {
    @Autowired
    public MailConfigurationProperties properties;

    @Bean
    TopicExchange incomingExchange() {
        TopicExchange incomingExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getIncoming());
        return incomingExchange;
    }

    @Bean
    TopicExchange dlExchange() {
        TopicExchange dlExchange = new TopicExchange(properties.getRabbitMQ().getExchange().getDeadletter());
        return dlExchange;
    }

    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(properties.getRabbitMQ().getQueue().getIncoming())
            .withArgument(                 
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
                dlExchange().getName()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
                properties.getRabbitMQ().getRoutingKey().getDeadLetter()
            )
            .build();
    }

    @Bean
    public Queue parkingLotQueue() {
        return new Queue(properties.getRabbitMQ().getQueue().getParkingLot());
    }

    @Bean
    Binding incomingBinding() {
        return BindingBuilder
            .bind(incomingQueue())
            .to(incomingExchange())
            .with(properties.getRabbitMQ().getRoutingKey().getIncoming());
   }

    @Bean
    public Queue dlQueue() {
        return QueueBuilder
            .durable(properties.getRabbitMQ().getQueue().getDeadLetter())
            .withArgument(                      
                properties.getRabbitMQ().getMessages().X_MESSAGE_TTL_HEADER,
                properties.getRabbitMQ().getMessages().getDelayTime()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_EXCHANGE_HEADER,
                incomingExchange().getName()
            )
            .withArgument(
                properties.getRabbitMQ().getQueue().X_DEAD_LETTER_ROUTING_KEY_HEADER,
                properties.getRabbitMQ().getRoutingKey().getIncoming()
            )
            .build();
    }

    @Bean
    Binding dlBinding() {
        return BindingBuilder
            .bind(dlQueue())
            .to(dlExchange())
            .with(properties.getRabbitMQ().getRoutingKey().getDeadLetter());
    }

    @Bean
    public Binding bindParkingLot(
        Queue parkingLotQueue,
        TopicExchange dlExchange
    ) {
        return BindingBuilder.bind(parkingLotQueue)
            .to(dlExchange)
            .with(properties.getRabbitMQ().getRoutingKey().getParkingLot());
    }
}

Consumer class

@Component
public class Consumer {
    private final Logger logger = LoggerFactory.getLogger(Consumer.class);

    @Autowired
    public MailConfigurationProperties properties;

    @Autowired
    protected EmailClient mailJetEmailClient;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "${mail.rabbitmq.queue.incoming}")
    public Boolean receivedMessage(
        @Payload MailDataExternalTemplate mailDataExternalTemplate,
        Message amqpMessage
    ) {
        logger.info("Received message");

        try {
            final EmailTransportWrapper emailTransportWrapper = mailJetEmailClient.convertFrom(mailDataExternalTemplate);

            mailJetEmailClient.sendEmailUsing(emailTransportWrapper);
            logger.info("Successfully sent an E-Mail");
        } catch (Exception e) {
            int count = getXDeathCountFromHeader(amqpMessage);
            logger.debug("x-death count: " + count);

            if (count >= properties.getRabbitMQ().getMessages().getRetryCount()) {
                this.rabbitTemplate.send(
                     properties.getRabbitMQ().getExchange().getDeadletter(),
                     properties.getRabbitMQ().getRoutingKey().getParkingLot(),
                     amqpMessage
                );
                return Boolean.TRUE;
            }

            throw new AmqpRejectAndDontRequeueException("Failed to send an E-Mail");
        }

        return Boolean.TRUE;
    }

    private int getXDeathCountFromHeader(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        if (headers.get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER) == null) {
            return 0;
        }

        //noinspection unchecked
        ArrayList<Map<String, ?>> xDeath = (ArrayList<Map<String, ?>>) headers
            .get(properties.getRabbitMQ().getMessages().X_DEATH_HEADER);
        Long count = (Long) xDeath.get(0).get("count");
        return count.intValue();
    }

Answer:

To delay message to be available in the queue, you should consider to use DelayedExchange: https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/_reference.html#delayed-message-exchange.

As for manually sending to the parkingLot queue, that's just easy to use RabbitTemplate and send message using its name:

/**
 * Send a message to a default exchange with a specific routing key.
 *
 * @param routingKey the routing key
 * @param message a message to send
 * @throws AmqpException if there is a problem
 */
void send(String routingKey, Message message) throws AmqpException;

All the queues are bound to the default exchange via their names as routing keys.

Question:

I'm using RabbitMQ to publish messages with the payload that contains some boolean fields:

class EntityDto {
    String name;
    int order;
    boolean isEnabled = true;
    // generated setters and getters for all the fields
}

However, when I publish a message to the queue, and consume it on other end, I have such flags equal false. Other fields are serialized & deserialized back properly without any data loss.

To serialize entities, I'm using a JsonMessageConverter on both sides (producer & consumer). I tried switching to Jackson2JsonMessageConverter, but it doesn't change anything.

What may cause such behavior?


Answer:

Try to rename isEnabled to enabled (without prefix is) and rename getter/setter to getEnabled/setEnabled.

Question:

I have to implement a micro-service which takes a message from a rabbitmq queue, transform it and push it to a rabbitmq exchange.

I am implementing this functionality using Apache Camel RabbitMq and Spring Boot, using Java SDL for Camel. I am using (for now) a single rabbitmq server, on my machine. The configuration looks like this:

from("rabbitmq://localhost:5672/exchange1?username=guest&password=guest&queue=q1&autoDelete=false");  
.to("rabbitmq://localhost:5672/exchange2?username=guest&password=guest&exchangeType=fanout&skipQueueDeclare=false")

The code anove is getting the message from q1 and it is publishing it again in q1 over and over again. It works just to get a message from a queue and send it (for example) to file, or create a message separately and publish it to the exchange. Is there any way to make it work as I expect it to do?

Thanks


Answer:

The from() generates rabbitMQ headers. Unless you manually remove them, they are passed to your to(). This will create a mismatch in your connection. Best is to delete the rabbit headers after your from() so they don't interfer in your to().

Question:

Actually I don't get this running. Maybe I've misunderstood something and this is not possible anyway. I am trying to configure 2 listeners on one and the same queue, same exchange but only the routing key should differ. My problem is that somehow things get messed up. The result is that listener A gets messages which are for listener B. But only sometimes and sometimes everything works fine. Any suggestions?

MyConfig

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(getHostname());
    connectionFactory.setUsername(getUsername());
    connectionFactory.setPassword(getPassword());
    return connectionFactory;
}

@Bean
public RabbitAdmin rabbitAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(new CustomMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    factory.setConcurrentConsumers(10);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(new MappingJackson2MessageConverter());
    return factory;
}

MyListeners A

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyA"))
public String myListenerA(@Payload PayloadA payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) { 

    return SUCCESS_RESPONSE;
}

MyListener B

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE, durable = "true"), exchange = @Exchange(value = EXCHANGE, type = "topic", durable = "true", ignoreDeclarationExceptions = "true"), key = "routingKeyB"))
public String myListenerB(@Payload PayloadB payload, @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) { 

    return SUCCESS_RESPONSE;
}

Additional information: I've got 20 consumers on this queue. Thx in advance!


Answer:

RabbitMQ doesn't work that way; unlike JMS, there is no way to select messages from a queue (e.g. based on the routing key).

All you have done is bound the same queue to the exchange with 2 different routing keys. So, yes, either listener will get the message, regardless of how it got to the queue.

With RabbitMQ, you need a separate queue for each listener. When the producer publishes to the exchange, the broker will take care of routing the message to the correct queue, based on the routing key he used.

If you have multiple instances of each listener, the messages will be distributed accordingly (only one delivery per queue).

Question:

1) spring-xd rabbitmq 'requeue=false' option not working. below both setting result is same. i was set 'requeue' option to 'false' but rabbitmq always attempt requeue(=retry). Are there any priorities between these options? Is the (maxAttempts priority > requeue priority)???

module.*.consumer.requeue = true
module.*.consumer.maxAttempts = 5


module.*.consumer.requeue = false
module.*.consumer.maxAttempts = 5

2) how to auto processing DLQ back to the bus queue? ( using the spring xd setting...)


3) Is there another way to ask the second question?


please help me :'(


Answer:

The requeue option only makes sense when retry is turned off (maxAttempts=1).

When retry is enabled (maxAttempts > 1), the message is rejected and not requeued after the retries are exhausted.

Turn on DEBUG logging to see the retry and message rejection behavior.

There is no mechanism provided to move messages back to the main queue - see this documentation for the newer Spring Cloud Stream project for some suggestions.

Question:

I need to setup RabbitMQ in an attempt to redesign our architecture using asynchronous messaging.

Existing Application Flow:

  • JEE web application (via browser) creates a new thread.
  • This thread creates a new OS process to invoke a Perl script to do some processing.
  • Perl script writes its output in a file and the control comes back to the thread.
  • The thread then reads the output file and loads the results to the database.
  • The control passes to the servlet which displays the result to the UI.

All these are synchronous and time consuming and we need to convert this to asynchronous messaging communication.

Now, I am planning to break this down to the following different components but not sure if this would work with RabbitMQ:

Application Breakdown:

  • JEE Web Application which is the Producer for RabbitMQ
  • Separate the Perl Script in to its own application that supports RabbitMQ communication. This Perl client will consume the message, process it and places a new message in RabbitMQ for the next step
  • Separate the output file to database loader into its own Java application that suppors RabbitMQ communication. This would consume the message from the queue corresponding to the Perl client's message from the previous step.

This way, the output would be available in the database and the asynchronous flow would be completed.

  1. Is is possible to separate the applications this way compatible to RabbitMQ?
  2. Are there any better ways to do this?
  3. Please suggest some framework components for RabbitMQ and Perl

Appreciate your inputs with this.


Answer:

Yes, you can do it that way. If it's not a hard work, I'll include the database load on the Perl step. This probably avoids to handle an intermediate file, but I don't know if it's a viable task on your project.

In order to use RabbitMQ, I'll recommend you the AnyEvent::RabbitMQ CPAN module. As the documentation establishes, You can use AnyEvent::RabbitMQ to:

  • Declare and delete exchanges
  • Declare, delete, bind and unbind queues
  • Set QoS and confirm mode
  • Publish, consume, get, ack, recover and reject messages
  • Select, commit and rollback transactions

Question:

I have some durable queues running in the broker. At some point, I want to check if the queue has got any data in it and close or kill that queue if it is empty.

I am using java to code the senders and receivers. I know which all queues are present in the broker.

Suggest your way of doing this.


Answer:

You can check the GetResponse object value for getting the info that whether the queue has something in it or not. If GetResponse is null, you can delete the queue, considering the queue as empty.

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection con = factory.newConnection();
        rabbitChannel = con.createChannel();
GetResponse response = rabbitChannel.basicGet(QUEUE_NAME, BOOLEAN_NOACK);
if(response != null){
String body = new String(responseQuestion.getBody());
// do whatever you want to do here
}
else{
rabbitChannel.queueDelete(QUEUE_NAME);
}

OR

Use this queueDelete(java.lang.String queue, boolean ifUnused, boolean ifEmpty)

This will automatically check whether if queue is empty and not_in_use based on boolean you provided in arguments & delete it accordingly.

Question:

I am using Spring AMQP in my message driven application. I noticed that there is a nearly constant delay of around 300ms between invocations of my message listener, even though I am sure that the queue is filled with messages. The logfile below shows this delay between BlockingQueueConsumer.nextMessage and BlockingQueueConsumer.handle with a call to BlockingQueueConsumer.handleDelivery from another thread in between:

2015-05-12 12:46:18,655 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,655 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [pool-1-thread-6 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@18dc305(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:16:06 CEST 2015, messageId=143134227498011576, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=8, messageCount=0])
2015-05-12 12:46:18,967 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:18,967 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:18,967 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:18,967 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [pool-1-thread-7 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1aaa7d8(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:17:08 CEST 2015, messageId=143134227498011584, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=9, messageCount=0])
2015-05-12 12:46:19,280 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,280 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done
2015-05-12 12:46:19,280 TRACE [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer.doReceiveAndExecute Waiting for message from consumer.
2015-05-12 12:46:19,280 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.nextMessage Retrieving delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [pool-1-thread-3 ] BlockingQueueConsumer.handleDelivery Storing delivery for Consumer: tags=[{amq.ctag-wwui6QjS1fAnFPM7j6GIvw=my-queue}], channel=Cached Rabbit Channel: AMQChannel(mybrokerip,1), acknowledgeMode=AUTO local queue size=0
2015-05-12 12:46:19,577 DEBUG [SimpleAsyncTaskExecutor-1] BlockingQueueConsumer.handle Received message: (Body:'[B@1c893d2(byte[186])'MessageProperties [headers={..headers..}, timestamp=Tue May 12 01:18:07 CEST 2015, messageId=143134227498011592, userId=null, appId=SPT-T-2, clusterId=null, type=HBT, correlationId=null, replyTo=null, contentType=text, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=incoming, receivedRoutingKey=my-queue, deliveryTag=10, messageCount=0])
2015-05-12 12:46:19,577 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Incoming
2015-05-12 12:46:19,577 INFO  [SimpleAsyncTaskExecutor-1] QueueMessageHandler.onMessage Done

The logfile shows the message processing when the queue is definately full of messages. The relevant parts of my Spring configuration file looks like this:

<rabbit:connection-factory id="amqpConnectionFactory" connection-factory="clientConnectionFactory"
    host="${amqp.broker.ip}"
    port="${amqp.broker.port}"
    virtual-host="${amqp.broker.vhost}"
    username="${amqp.user}"
    password="${amqp.password}"/>

<bean id="clientConnectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:server.ini"/>
</bean>

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="amqpConnectionFactory" />
    <property name="messageConverter" ref="marshallingMessageConverter"/>
</bean>

<bean id="marshallingMessageConverter" class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
    <constructor-arg ref="jaxbMarshaller" />
</bean>

<oxm:jaxb2-marshaller id="jaxbMarshaller" context-path="com.my.package"/>

<rabbit:listener-container id="heartbeatListenerContainer" connection-factory="amqpConnectionFactory" auto-startup="false">
    <rabbit:listener ref="queueMessageHandler" queue-names="heartbeat-bdwh" />
</rabbit:listener-container>

<bean id="queueMessageHandler" class="com.my.package.QueueMessageHandler"/>

I am struggling to find the reason for this delay. As far as I understand it originates from the Spring BlockingQueueConsumer. I am not sure what is happening and why there is a call from another thread to the BlockingQueueConsumer.handleDelivery method.

Any help is greatly apprectiated!


Answer:

Maybe a network issue?

The default configuration handles 1 message at a time and the next is not sent by the broker until the ack is sent.

Try increasing the prefetch on the listener container so the container always has a message available when the consumer thread is ready.

Take a look at a network trace (Wireshark or similar).

EDIT:

If you have a poor network, and can live with the increased possibility of duplicate deliveries, you can also consider increasing the txSize so acks are not sent for every message. Be sure to set it to something less than prefetch, though.

Question:

I have a Java class that -upon a certain action from the GUI- initiates a connection with the RabbitMQ server (using the pub/sub patter) and listens for new events.

I want to add a new feature where I will allow the user to set an "end time" that will stop my application from listening to new events (stop consuming from the queue without closing it).

I tried to utilise the basicCancel method, but I can't find a way to make it work for a predefined date. Would it be a good idea to initiate a new thread inside my Subscribe class that will call the basicCancel upon reaching the given date or is there a better way to do that?

Listen to new events

    private void listenToEvents(String queueName) {
        try {
              logger.info(" [*] Waiting for events. Subscribed to : " + queueName);

              Consumer consumer = new DefaultConsumer(channel) {

                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body) throws IOException {

                    TypeOfEvent event = null;

                    String message = new String(body);


                    // process the payload
                    InteractionEventManager eventManager = new InteractionEventManager();
                    event = eventManager.toCoreMonitorFormatObject(message);


                    if(event!=null){    
                        String latestEventOpnName = event.getType().getOperationMessage().getOperationName();

                        if(latestEventOpnName.equals("END_OF_PERIOD"))
                            event.getMessageArgs().getContext().setTimestamp(++latestEventTimeStamp);            

                        latestEventTimeStamp = event.getMessageArgs().getContext().getTimestamp();                                    
                        ndaec.receiveTypeOfEventObject(event);                  
                    }
                  }
                };

                channel.basicConsume(queueName, true, consumer);   
             //Should I add the basicCancel here?
        }
        catch (Exception e) {
            logger.info("The Monitor could not reach the EventBus. " +e.toString());
        }     

    }

Initiate Connection

  public String initiateConnection(Timestamp endTime) {

        Properties props = new Properties();
        try {
            props.load(new FileInputStream(everestHome+ "/monitoring-system/rabbit.properties"));
         }catch(IOException e){
             e.printStackTrace();
        }                       

        RabbitConfigure config = new RabbitConfigure(props,props.getProperty("queuName").trim());

        ConnectionFactory factory = new ConnectionFactory();

        exchangeTopic = new HashMap<String,String>();
        String exchangeMerged = config.getExchange();
        logger.info("Exchange=" + exchangeMerged);
        String[] couples = exchangeMerged.split(";");

        for(String couple : couples)
        {
            String[] infos = couple.split(":");
            if (infos.length == 2)
            {
                exchangeTopic.put(infos[0], infos[1]);
            }
            else
            {
                logger.error("Invalid Exchange Detail: " + couple);
            }
        }

        for(Entry<String, String> entry : exchangeTopic.entrySet()) {

            String exchange = entry.getKey();
            String topic = entry.getValue();

            factory.setHost(config.getHost());
            factory.setPort(Integer.parseInt(config.getPort()));
            factory.setUsername(config.getUsername());
            factory.setPassword(config.getPassword());

            try {
                connection1= factory.newConnection();
                channel = connection1.createChannel();
                channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
                /*Map<String, Object> args = new HashMap<String, Object>();
                args.put("x-expires", endTime.getTime());*/
                channel.queueDeclare(config.getQueue(),false,false,false,null);
                channel.queueBind(config.getQueue(),exchange,topic);            
                logger.info("Connected to RabbitMQ.\n Exchange: " + exchange + " Topic: " + topic +"\n Queue Name is: "+ config.getQueue());
                return config.getQueue();
            } catch (IOException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            } catch (TimeoutException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            }
        }
        return null;
    }

Answer:

You can create a delayed queue, setting the time-to-leave so the message you push there will be dead-lettered exactly as soon as you want to stop your consumer.

Then you have to bind the dead letter exchange to a queue whose consumer will stop the other one as soon as it gets the message.

Never use threads when you have RabbitMq, you can do a lot of interesting stuff with delayed messages!

Question:

This is my receive function to receive messages in rmq. It is explained more here: https://www.rabbitmq.com/tutorials/tutorial-one-java.html

This code displays my messages but I want to store them in a variable.

The issue is handleDelivery is void. When I change void to String I get:

"The return type is incompatible with DefaultConsumer.handleDelivery(String, Envelope, AMQP.BasicProperties, byte[])"

Any Ideas how I can store the messages in a variable?

        public String recv() throws Exception
        {
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            Consumer consumer = new DefaultConsumer(channelRecv) 
              {
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
                 throws IOException 
              {
                String message = new String(body, "UTF-8");  
                System.out.println(" [x] Received '" + message + "'");
                return message;
              }      

            };
            channelRecv.basicConsume(queRecv, true, consumer);

        }

Edit: Here is the error I get when I run my main

The error is: Exception in thread "main" java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1255)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:471)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:461)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:456)
at Recv.recv(Recv.java:44)
at mainLaptop.main(mainLaptop.java:11)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'Leonardo' in vhost '/', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1253)
... 5 more

Here is my code

    public class Recv 
    {

public static String recv(String ip, String Q) throws Exception 
{

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(ip);
    factory.setUsername("test");
    factory.setPassword("test");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    MyConsumer consumer=new MyConsumer(channel);
    channel.basicConsume(Q,true,consumer);

    return consumer.getStoredMessage();
}

public static class MyConsumer extends DefaultConsumer 
{
    private String storedMessage;

    public MyConsumer(Channel channel) 
    {
        super(channel);
    }

    public String getStoredMessage() 
    {
        return storedMessage;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException 
    {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
        storedMessage = message; // store message here
    }
}
}

Answer:

You can create custom extension of DefaultConsumer class, that can set/get the result

public String recv() throws Exception {
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    MyConsumer consumer=new MyConsumer(channelRecv);
    channelRecv.basicConsume(queRecv,true,consumer);

    consumer.getStoredMessage(); // use stored value here
}

public class MyConsumer extends DefaultConsumer {
    private String storedMessage;

    public MyConsumer(Object channelRecv) {
        super(channelRecv);
    }

    public String getStoredMessage() {
        return storedMessage;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
        storedMessage = message; // store message here
    }
}

Question:

I get the following exception when starting my server on one of the environments. However, it works fine on the another environment.

Caused by: org.springframework.amqp.AmqpIllegalStateException: Fatal exception on listener startup
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:342)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:363)
        at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173)
        ... 54 more
    Caused by: org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.

    Caused by: java.io.IOException
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
            at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
            at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:790)
            at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
            at com.sun.proxy.$Proxy89.queueDeclarePassive(Unknown Source)
            at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:216)

        Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'blah.blah.queue' in vhost '/', class-id=50, method-id=10), null, ""}
                at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:474)
                at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
                at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
                at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
                at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

Note: I have the RabbitAdmin defined in the spring xml definitions as follows:

<bean id="admin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
    <constructor-arg ref="rabbitConnectionFactory"/>
</bean>

I am not sure why the queues will get autocreated on one environment and not with the other.Any help with this will be great.

<rabbit:queue name="blah.blah.queue">
    <rabbit:queue-arguments>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name="blah.blah.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="blah.blah.queue" key="blah.blah.routing.key"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

*Added the queue declaration

Here are some more error logs:

2018-01-04 17:57:46,782 [instance=01] [SimpleAsyncTaskExecutor-1] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer (SimpleMessageListenerContainer.java:562) - Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:231)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:527)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:790)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
    at com.sun.proxy.$Proxy89.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:216)
    ... 2 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'blah.blah.queue' in vhost '/', class-id=50, method-id=10), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 11 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'blah.blah.queue' in vhost '/', class-id=50, method-id=10), null, ""}
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:474)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

.Adding some debug logs.. I have not yet been able to figure out the issue.

2018-01-18 16:16:21,583 [instance=01] [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory (CachingConnectionFactory.java:188) - Creating cached Rabbit Channel from AMQChannel(amqp://guest@10.221.57.217:5672/,1)
2018-01-18 16:16:21,596 [instance=01] [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate (RabbitTemplate.java:625) - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@10.221.57.217:5672/,1)
2018-01-18 16:16:21,596 [instance=01] [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin (RabbitAdmin.java:387) - declaring Exchange 'blah.blah.exchange'
2018-01-18 16:16:22,439 [instance=01] [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory (CachingConnectionFactory.java:354) - Detected closed channel on exception.  Re-initializing: null
2018-01-18 16:16:22,652 [instance=01] [RMI TCP Connection(3)-127.0.0.1] DEBUG org.springframework.context.support.DefaultLifecycleProcessor (DefaultLifecycleProcessor.java:170) - Starting bean 'blahblahlistener' of type [class org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer]
2018-01-18 16:16:22,652 [instance=01] [RMI TCP Connection(3)-127.0.0.1] DEBUG org.springframework.aop.framework.JdkDynamicAopProxy (JdkDynamicAopProxy.java:117) - Creating JDK dynamic proxy: target source is SingletonTargetSource for target object [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1@45ecefcc]
2018-01-18 16:16:22,652 [instance=01] [RMI TCP Connection(3)-127.0.0.1] DEBUG org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer (AbstractMessageListenerContainer.java:361) - Starting Rabbit listener container.
2018-01-18 16:16:22,653 [instance=01] [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer (BlockingQueueConsumer.java:198) - Starting consumer Consumer: tag=[null], channel=null, acknowledgeMode=AUTO local queue size=0
2018-01-18 16:16:23,078 [instance=01] [SimpleAsyncTaskExecutor-1] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory (CachingConnectionFactory.java:354) - Detected closed channel on exception.  Re-initializing: null
2018-01-18 16:16:23,290 [instance=01] [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.listener.BlockingQueueConsumer (BlockingQueueConsumer.java:222) - Reconnect failed; retries left=2
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:790)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:348)
    at com.sun.proxy.$Proxy89.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:216)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:527)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'blah.blah.queue' in vhost '/', class-id=50, method-id=10), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 11 more

.


Answer:

I was finally able to figure this out. Posting the answer here just in case it helps others who come across a similar situation:

I figured out that the app was declaring another pre-existing queue. However, this pre-existing queue had a dead-letter-routing-key defined in one environment and not in the other... inconsistency across environments. The xml did not have this argument defined. So it worked in the other environment and not in the 1st one. So before going ahead to create any of the non-existing queues, the rabbit channel was getting closed because the arguments mismatched.

Question:

I'm new in RabbitMQ and I want to modify a message before consuming it by a queue. I have an exchange which should stay untouchable. The client receives messages with a specific routing key. But there a lot of them and I want to filter and change body before publishing them into queue.

Exchange populate messages looks like:

{
"_context_domain": "unsuitable",
"_msg_id": "1",
"_context_quota_class": null, 
"_context_read_only": false,
"_context_request_id": "1"
}

{
"_context_domain": "suitable",
"_msg_id": "2",
"_context_quota_class": null, 
"_context_read_only": false,
"_context_request_id": "2"
}

Is there any way to filter and modify them before consuming? For example:

...
channel.queueBind(QUEUE_NAME, "EXCHANGE_NAME", "ROUTNG_KEY");
final Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              String message = new String(body, "UTF-8");
              Gson mapper = new Gson();
              SomeObject object = (SomeObject) mapper.fromJson(message, SomeObject.class);
                     if (SomeObject.getContext_domain = "suitable"){
                           //publish somehow SomeObject.getMsg_id into QUEUE_NAME 
}
}

Is there any way to do it?


Answer:

Exchanges supported by AMQP do not allow filtering on the message body. You could use "topic" or "header" exchanges which can route messages based on the routing key or a message header.

However none of them allows you to modify the message itself. If you want to do that, you need to develop your own RabbitMQ plugin implementing an exchange.

If you want to go that road, it is quite easy to do. However, exchanges are not exactly designed for this purpose: they are simply routing tables. If you do that, the channel process in RabbitMQ will consume resources to do whatever modifications you want (ie. an exchange is not a process, it's a line in a table basically). That also means other clients on the same connection may be blocked until the channel finished to modify and queue a message.

A more common way to achieve what you want is to use a consumer to handle all messages, do whatever filtering/modifications you need there and queue the result in a second queue. This second queue would be consumed by your normal workers.

Question:

String queueA = "rabbitmq://host:5672/queue-a.exchange?queue=queue-a.exchange..etc

from(queueA)
  .routeId("idForQueueA")
  .onException(Exception.class)
    .maximumRedeliveries(0)
    // .processRef("sendEmailAlert")  * not sure this belongs here*
    .to(deadLetterQueueA)
    .useOriginalMessage()
  .end()
    .processRef("dataProcessing")
    .processRef("dataExporting")
  .end();

Explaining the code above:

Messages are taken from queueA. Upon various processes being successful the message is consumed. If it fails its added to the dead letter queue "deadLetterQueueA". This all works ok.

My question is

When messages arrive in the deadletter queue I want to add alerts so we know to do something about it... How could I to add an email alert when a message arrives in the dead letter queue. I dont want to lose the original message if the alert fails - nor do I want the alert to consume the message.


My thoughts are.. I would need to split the message on an exception so its sent to two different queues? One for the alert which then sends out an email alert and then consumes itself. Then one for the dead letter queue that just sites there? However I'm not sure how to do this?


Answer:

You can split a message to go to multiple endpoints using a multicast (details here):

.useOriginalMessage().multicast().to(deadLetterQueueA, "smtp://username@host:port?options")

This uses the camel mail component endpoints described here. Alternatively, you can continue processing the message after the to. So something like:

.useOriginalMessage()
.to(deadLetterQueueA)
.transform().simple("Hi <name>, there has been an error on the object ${body.toString}")
.to("smtp://username@host:port?options")

If you had multiple recipients, you could use a recipients list

public class EmailListBean {
    @RecipientList
    public String[] emails() {
        return new String[] {"smtp://joe@host:port?options",
                "smtp://fred@host:port?options"};
    }
}

.useOriginalMessage()
.to(deadLetterQueueA)
.transform().simple("...")
.bean(EmailListBean.class)

Be careful of using JMS queues to store messages while waiting for a human to action them. I don't know what sort of message traffic you're getting. I'm assuming if you want to send an email for every failure, it's not a lot. But I would normally be wary of this sort of thing, and chose to use logging or database persistence to store the results of errors, and only use a JMS error queue to notify other processes or consumers of the error or to schedule a re-try.

Question:

I have a need to dynamically declare and assign new queues to my existing listener.

I have a listener declared like so:

@Component
public class AccountListener {
    @RabbitListener(id = "foobar")
    public String foo(String a) {
        System.out.println(a);
        return a + "xxx";
    }
}

I can retrieve this listener using RabbitListenerEndpointRegistry, but how do I expose it via a queue?

@Autowired
private AmqpAdmin rabbit;
@Autowired
private RabbitListenerEndpointRegistry registry;


public void exposeQueue(String queueName) throws Exception {
      Queue queue = new Queue(queueName, false);

      rabbit.declareQueue(queue);
      SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) registry.getListenerContainer("foobar");

     // Attach $listener to $queue here

}

Answer:

You should add the queue to the container's list of queues:

listener.addQueueNames(queueName);

addQueueNames() method will add the queue to the container at runtime. See here for more info.

Question:

I create a queue "a.1" , a exchange "a" and bind them together through a rabbitmq channel. This channel is in a connnection which has about 3 hundreds channels.After running normally for 20-30 minutes,the binding is disappeared and the queue is binded to default exchange. I watched it in rabbitmq admin,I saw the queue was once closed and auto-recovered.After the recovering,I could see the channel was changed.channel info:ip:2341 (633),the port is changed to ip:3350.But the queue is binded to default exchange.Why rabbitmq has this strange behavior?How to avoid it?


Answer:

You are using auto-delete queues, which means that if all consumers go down queue will be deleted automatically. Then another channel can recreate queue with same name. So if you are creating queues with same names you can make sure it's bound avery time when created. Or you can add expiration TTL for auto-deleting queues to wait for some time before deletion (to allow another channel to start consuming).

Question:

Using RabbitMQ, I have two types of consumers: FileConsumer writes messages to file and MailConsumer mails messages. There may be multiple consumers of each type, say three running MailConsumers and one FileConsumer instance.

How can I do this:

  • Each published message should be handled by exactly one FileConsumer instance and one MailConsumer instance
  • Publishing a message should be done once, not one time for each queue (if possible)
  • If there are no consumers connected, messages should be queued until consumed, not dropped

What type of exchange etc should I use to get this behavior? I'd really like to see some example/pseudo-code to make this clear.

This should be easy to do, but I couldn't figure it out from the docs. It seems the fanout example should work, but I'm confused with these "anonymous queues" which seems like it will lead to sending same message into each consumer.


Answer:

If you create queue without auto-delete flag, then queues will stay alive even after consumers disconnection.

Note, that if you declare queue as persistent, it will be present even after broker restart.

If you will publish then messages with delivery-mode=2 property set (that mean that message will be persistent), such messages will stay in persistent (this is important to make queue persistent) queues even after broker restart.

Using fanout exchange type is not mandatory. You can also use topic for better message routing handling if you need that.

UPD: step-by-step way to get what you show with schema.

  1. Declare persistent exchange, say main, as exchange.declare(exchange-name=main, type=fanout, durable=true).
  2. Delcare two queues, say, files and mails as queue.declare(queue-name=files, durable=true) and queue.declare(queue-name=mails, durable=true)
  3. Bind both queues to exchange as queue.bind(queue-name=files, exchange-name=main) and queue.bind(queue-name=mails, exchange-name=main).

At this point you can publish messages to main exchange (see note about delivery-mode above) and consume with any consumer number from queues, from files with FileConsumer and from mails with MailConsumer. Without any consumers on queues messages will be queued and stay in queue until they consumed (or broker restart is they are not persistent).

Question:

I am running 2 services: one which connects to the DB and sends data to msg broker, and one which should take a message from rabbit and send it via batch to targetDB. I have the same RabbitConfiguration in each service but for some reason I am getting:

org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate.
    at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]

2020-03-01 19:50:36.157  INFO 1748 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 80ms
2020-03-01 19:50:36.183  INFO 1748 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importClientJob]] completed with the following parameters: [{run.id=19}] and the following status: [FAILED] in 142ms

Configuration Class:

@Configuration
public class RabbitConfiguration {
    public static final String MESSAGE_EXCHANGE = "clients-exchange";
    public static final String MESSAGE_QUEUE = "clients-queue";
    public static final String MESSAGE_ROUTING_KEY = "clients.msg";

    private final ConnectionFactory connectionFactory;

    public RabbitConfiguration(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Bean
    Queue queue() {
        return new Queue(MESSAGE_QUEUE, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MESSAGE_EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(MESSAGE_ROUTING_KEY);
    }

    @Bean
    RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
        rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
        rabbitTemplate.setTaskExecutor(taskExecutor);
        return rabbitTemplate;
    }
}

And BatchConfiguration :

@Configuration
public class BatchConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value("${pusher.spring-batch-chunk-size}")
    private int chunkSize;

    private DataSource dataSource;
    private RabbitTemplate rabbitTemplate;
    private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
    private ClientPreparedStatementSetter clientPreparedStatementSetter;

    @Autowired
    public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, DataSource dataSource, RabbitTemplate rabbitTemplate, NamedParameterJdbcTemplate namedParameterJdbcTemplate, ClientPreparedStatementSetter clientPreparedStatementSetter) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.dataSource = dataSource;
        this.rabbitTemplate = rabbitTemplate;
        this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
        this.clientPreparedStatementSetter = clientPreparedStatementSetter;
    }

    @Bean
    public JdbcBatchItemWriter<Client> cursorItemWriter() {
        return new JdbcBatchItemWriterBuilder<Client>()
                .dataSource(this.dataSource)
                .namedParametersJdbcTemplate(namedParameterJdbcTemplate)
                .itemPreparedStatementSetter(clientPreparedStatementSetter)
                .sql("INSERT INTO CLIENT (id, firstname, lastname, email, phone) VALUES (?,?,?,?,?)")
                .build();
    }

    @Bean
    public AmqpItemReader<Client> clientAmqpItemReader() {
        return new AmqpItemReaderBuilder<Client>()
                .amqpTemplate(rabbitTemplate)
                .build();
    }

    @Bean
    public ClientLowerCaseProcessor lowerCaseProcessor() {
        return new ClientLowerCaseProcessor();
    }

    @Bean
    public Job importClientJob(Step step1) {
        return jobBuilderFactory.get("importClientJob")
                .incrementer(new RunIdIncrementer())
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
        return stepBuilderFactory.get("step1")
                .<Client, Client>chunk(chunkSize)
                .reader(clientAmqpItemReader())
                .processor(lowerCaseProcessor())
                .writer(cursorItemWriter())
                .taskExecutor(taskExecutor)
                .build();
    }
}

I have tried to remove Task executioner and fiddle with config but with no success.


Answer:

org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate. at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE] at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]

AmqpItemReader in spring-batch uses RabbitTemplate#receive() to receive messages from RabbitMQ and it requires you to set defaultReceiveQueue in RabbitTemplate to specify which queue to receive messages but you miss to configure it. So you have to specify the queue name when configuring the RabbitTemplate:

@Bean
RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
    rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
    rabbitTemplate.setDefaultReceiveQueue("someQueue");
    rabbitTemplate.setTaskExecutor(taskExecutor);
    return rabbitTemplate;
}

Question:

I received a suggestion on this question, on a comment, saying that I might not have the Exchange I needed because my queues already existed. So, I manually deleted them all.

However, when re-deploying the application, I had the following exception for all the queues I had:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myInput.group]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:721)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:598)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1472)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclarePassive(AutorecoveringChannel.java:364)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980)
    at com.sun.proxy.$Proxy166.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:700)
    ... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'myInput.group' in vhost 'production', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'myInput.group' in vhost 'production', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
    ... 1 more

I'm not sure what this means, since I obviously want to create the queue myInput.group, and it seems to be complaining it does not exists...

I've also checked on this question that my problem could be permissions. But I should have them - otherwise I believe I wouldn't be able to delete the queues...

Can you guide me to a solution?


The code from my previous question, to help you with the investigation:

@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

My MessageInputProcessor interface:

public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

And my properties file:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000

#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

EDIT:

I do have EnableBinding annotation in another component on the project (and now I've tried to add to the class HandlerDlq, but the error was the same), and I have the log entry:

2019-12-17 08:45:11.584 INFO 96124 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: myInput.group, bound to: myInput

After that, there is an error message without stacktrace. Content below:

{ "written_at":"2019-12-17T14:06:59.263Z","written_ts":460692906701698,"type":"log","logger":"org.springframework.amqp.rabbit.connection.CachingConnectionFactory","thread":"AMQP Connection <ip>:5672","level":"ERROR","categories":[],"msg":"Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)","tenant_id":"-","component_type":"application","component_id":"******","space_name":"dev","component_name":"*******","component_instance":"0","organization_id":"****","correlation_id":"-","organization_name":"****","space_id":"*****","container_id":"***","custom_fields":{} }

Answer:

Do you have @EnableBinding(MessageInputProcessor.class) on a configuration class?

If so check the startup logs; you should see

2019-12-17 08:45:11.584 INFO 96124 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: myInput.group, bound to: myInput

Look for any errors there.

If you don't have @EnableBinding the provisioning is not performed and you will have to manually provision with @Beans like your other queue.

If this "DLQ reprocessing" is a different app to the main stream app, then you should not have @EnableBinding here, you should properly configure the binding in the main app.

Question:

So, this is new question related to this one

I have created a case where ListenerContainer is actually run before @Bean with phase 0, even though it has phase Integer.MAX_INT. Here is the code:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.support.GenericApplicationContext;

@SpringBootApplication
public class RuntimeRegisterApp {

  public static void main(String[] args) {
    SpringApplication.run(RuntimeRegisterApp.class, args);
  }

  @Bean
  public CachingConnectionFactory cachingConnectionFactory(){
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.setAddresses("10.10.121.199:35682");
    cachingConnectionFactory.setUsername("guest");
    cachingConnectionFactory.setPassword("guest");
    return cachingConnectionFactory;
  }

  @Bean
  public DirectRabbitListenerContainerFactory directFactory(ConnectionFactory cachingConnectionFactory) {
    DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
    factory.setConnectionFactory(cachingConnectionFactory);
    return factory;
  }

  @RabbitListener(bindings = {
      @QueueBinding(value = @Queue(value = "SomeQueue", durable = "false", autoDelete = "true"),
          exchange = @Exchange(value = "SomeEX", ignoreDeclarationExceptions = "true"),
          key = "SomeKey")
  },
      containerFactory = "directFactory"
  )
  public void onMessage(String msg){
    System.out.println("Received " + msg);
  }

  public void onMessage2(String msg){
    System.out.println("Received 2 " + msg);
  }

  @Bean
  public org.springframework.amqp.core.Queue someQueue(){
    return QueueBuilder.nonDurable("some1").build();
  }

  @Bean
  public DirectMessageListenerContainer container(DirectRabbitListenerContainerFactory directFactory){
    DirectMessageListenerContainer container = directFactory.createListenerContainer();
    container.setConsumersPerQueue(2);
    container.setMessageListener(
        message -> System.out.println("BEAN CONTAINER " + message.toString()));
    container.setQueues(someQueue());
    return container;
  }

  @Bean
  public RabbitAdmin rabbitAdmin(){
    return new RabbitAdmin(cachingConnectionFactory());
  }

  @Bean
  SmartLifecycle dynamicRegister(GenericApplicationContext applicationContext,
      DirectMessageListenerContainer container,
      DirectRabbitListenerContainerFactory directFactory,
      RabbitAdmin rabbitAdmin){

    return new SmartLifecycle() {

      private boolean running;

      private void dynamicallySetQueues(){
        org.springframework.amqp.core.Queue q1 = QueueBuilder
            .nonDurable("mySomeQueue")
            .build();

        rabbitAdmin.declareQueue(q1);

        applicationContext.registerBean(org.springframework.amqp.core.Queue.class, () -> q1);

        List<String> queues = new ArrayList<>(Arrays.asList(container.getQueueNames()));
        queues.add(q1.getName());


        //THIS ONE WORKS SINCE WE USE FACTORY AND SET QUEUES BEFORE START
        DirectMessageListenerContainer container1 = directFactory.createListenerContainer();
        container1.setQueueNames(queues.toArray(new String[0]));
        container1.setMessageListener(message -> System.out.println("INNER CONTAINER" + message.toString()));
        container1.start();

        //THIS ONE WORKS SINCE WE ONLY ADD QUEUES
        container.addQueueNames(q1.getName());

        //SETTING QUEUES HERE FAILS, SINCE CONTAINER ALREADY RUNNING
        //BUT IT SHOULD NOT RUN, SINCE THIS IS PHASE 0 ?
        //I GUESS SINCE IT IS NEEDED HERE IT RUNS ANYWAY ?
        container.setQueueNames(queues.toArray(new String[0]));
      }

      @Override
      public void start() {
        dynamicallySetQueues();
        running = true;
      }

      @Override
      public void stop() {
        running = false;
      }

      @Override
      public int getPhase() {
        return 0; //return 0 so we add queues before ListenerContainer starts
      }

      @Override
      public boolean isRunning() {
        return running;
      }
    };
  }
}

I guess that it is running since it is actual dependency of SmartLifecycle bean. Only workaround I can see here is to setAutostart(false) on Container, and then inside SmartLifecycle bean call container.start() after setting the queue names.


Answer:

You are correct; the lifecycle processor starts any dependent beans before starting the current bean...

        for (String dependency : dependenciesForBean) {
            doStart(lifecycleBeans, dependency, autoStartupOnly);
        }

...in effect, adding the dependency decreases the dependent bean's phase.

Your solution is probably the simplest.

Question:

I wrote simple sample to read text from console and send it to the rabbitMq server:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Bean
    public IntegrationFlow fromConsoleToRabbitFlow() {
        return IntegrationFlows.from(consoleSource(), c -> c.id("consoleInput")
                .poller(Pollers.fixedRate(1000))
                .autoStartup(true)
        ).channel("consoleOutputChannel")
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("my_spring_integration_queue"))
                .get();
    }

    public MessageSource<String> consoleSource() {
        return CharacterStreamReadingMessageSource.stdin();
    }

}

It looks like almost working solution but I can't find my_spring_integration_queue in rabbitmq admin console:

But I can't find anything related to 'my_spring_integration_queue' on other tab. Where can I find it ?

I expect that application will create queue if it doesn't exist. I was not able to find a method for send into the queur so I used .routingKey method. I also tried .exchangeName method but it lead to the:

32019-08-27 13:26:15.972 ERROR 16372 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'my_spring_integration_queue' in vhost '/', class-id=60, method-id=40)
P.S.

the Queue tab looks like this:


Answer:

You either need to add the queue manually or use a RabbitAdmin @Bean to automatically declare it for you - the admin will find all beans of type Queue and declare them.

If you are using Spring Boot, it will automatically configure an admin bean for you so you just need the Queue @Bean.

See Configuring the Broker.

Question:

I would like to know if there is any way to return message back to its original place in queue. For example, let's say we have this ordering:

A-B-C-D with A being the head and D being the tail. Normally, when we use nack or reject with requeue=true, it simply enqueues the message after the current tail. In this case, when A is requeued, the new ordering becomes B-C-D-A. However, I wonder if there's any way to enqueue to the beginning of the queue. So, if A is "sent back", it would be placed at the beginning of the queue, in its original place.

I search for a solution using Spring AMQP.


Answer:

You need to set the prefetch (qos) to 1. Otherwise any prefetched messages will be delivered before the redelivery of A.

It will, however, impact performance.

Question:

Scenario: A microservice picks up a message from a RabbitMQ Queue, it's converted to an object and then the microservice makes a REST call to external service.

It's going to deal with thousands 'n thousands of these messages, is there a way of telling my consumer not to pick up a message off the Queue if we know the external Rest service is down?

I know I can do retries for an individual message once it's picked up, but I dont want to even pick it up if I know its down. I dont want to deal with thousands of messages in DLQ.

Also it feels like a Circuit Breaker design pattern, but I cant find any specific examples of how to implement it with AMQP.

Extra info: SpringBoot app, taking to RabbitMQ using spring amqp.

Thanks in advance


Answer:

You can stop and start the message listener container.

If you are using discrete containers you can stop/start the container bean.

If you are using @RabbitListener, provide an id attribute and wire in the RabbitListenerEndpointRegistry bean.

Then registry.getMessageListenerContainer(myId).stop();.

Question:

I'm trying to have a @RabbitListener to listen to several queues from different hosts loaded from properties. the queues are identical and require the same message processing.

I managed to do it "statically" by declaring my factories like this :

        @Bean
        public ConnectionFactory defaultConnectionFactory() {
          CachingConnectionFactory cf = new CachingConnectionFactory();
          cf.setAddresses(hosts);
          cf.setPort(port);
          cf.setUsername(username);
          cf.setPassword(password);
          cf.setConnectionLimit(10000);
     cf.getRabbitConnectionFactory().setConnectionTimeout(connectionTimeout);
        return cf;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory connectionFactory2() {
        SimpleRabbitListenerContainerFactory sf = new SimpleRabbitListenerContainerFactory();
        CachingConnectionFactory cf = new CachingConnectionFactory();
        cf.setAddresses(host2);
        cf.setPort(port2);
        cf.setUsername(username2);
        cf.setPassword(password2);
        cf.setConnectionLimit(connectionLimit);
        cf.getRabbitConnectionFactory().setConnectionTimeout(connectionTimeout);
        sf.setConnectionFactory(cf);
        return sf;
    }

And then adding @RabbitListener annotations to my listener class like so :

@RabbitListener (queues = "q1")
@RabbitListener (queues = "q2" , containerFactory = "connectionFactory2")
public class RabbitListener {

But I would like to be able to create connection factories at startup by loading some property list like rabbit.host[i]and then dynamically add it to my listener.

Is such a thing possible ? What are the best practices for this use case ?


Answer:

You can't do it with static @RabbitListener annotations, but you can register listener endpoints programmatically.

You can use a SimpleRabbitListenerEndpoint as shown there, or a MethodRabbitListenerEndpoint if you want to invoke a POJO method (like @RabbitListener.

You can wire in the appropriate factory in the overloaded registerEndpoint method.

Question:

I have a RabbitMQ with a job queue defined, and I'm consuming it with the Java through Spring Framework. I know that, if I throw an exception somewhere in the code while processing the job which I received from the queue, will return the job to the queue. But, is there some other way to return a job to the queue, without throwing and exception, or returning a job "manually" to the queue?


Answer:

The solution may depend on abstraction that you use:


Spring Cloud Streams

I would do this using a Spring Cloud Stream Processor that processes messages and sets the routingKeyExpression.

The bindings:

=> theSourceQueue 
   => myProcessor(message) consumes messages and setts routing key as a header
      => DestinationExchange 
         => route 1 => theSourceQueue
         => route 2 => ?

Spring AMQP
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class Receiver {
    @RabbitListener(queues = "my-messages")
    public void receiveMessage(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        System.out.println("Received message: `" + payload + "`, deliveryTag: " + deliveryTag);
        channel.basicNack(deliveryTag, false, true);
    }
}

RabbitMQ Java Client

You can go to the lower layer too and use negative acknowledgement and re-queue:

This example rejects two messages with a single call to the broker (the second argument on basicNack is the multiple flag):

GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false);
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);

When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.

Question:

Let's say I have a declared listener:

Listener.java

@RabbitListener(id = "test listener 1")
    public String test2(String req) {
        return req + " result";
}

I'm trying to expose it via queue during runtime:

ListenerTest.java

Queue declaredQueue = new Queue("new.queue", false);

admin.declareQueue(declaredQueue);

SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer) 
            registry.getListenerContainer("test listener 1");
listener.addQueues(declaredQueue);

And then I try message the newly declared queue :

String result = template.convertSendAndReceiveAsType("new.queue", "req", ParameterizedTypeReference.forType(String.class));

But it just times out and returns null.

When I inspect the listener in debugger I can't see any consumers bound to the new queue

You can find by rabbit config here and the rest of the source to test this here.

It's worth to note that this exact setup works in spring boot version 2.0.5.RELEASE so it might be a bug. I need to find a way to reinitialize the consumers.


Answer:

Adding queues at runtime will cause the container to recycle its consumers (the equivalent of stopping and restarting the container). See https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java#L687 This is due to the way the consumer is designed; each consumer thread consumes from multiple queues.

Changing the consumer count does not restart all consumers; see https://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java#L168

The new DirectMessageListenerContainer does not need to restart its consumers when queues are added (there is at least one consumer per queue).

However, it does not support dynamic concurrency scaling.

Question:

I'm using Spring Boot 2.0.1 and the Spring cloud dependencies are imported from:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>Finchley.RC1</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

I believe that the dependencies of interest are these:

<dependencies>
    ...
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-reactive</artifactId>
    </dependency>
    ...
</dependencies>

In my application.yaml I've added multiple consumer bindings:

spring:
  cloud:
    stream:
      bindings:
        input1:
          bindingRoutingKey: key1.#
          binder: rabbit
          group: group1
          destination: dest-group1
          transacted: true
        input2:
          bindingRoutingKey: key2.#
          binder: rabbit
          group: group2
          destination: dest-group2
          transacted: true

I've read for example here that one should add requiredGroups to the producer(s) in order to create the queues and bindings automatically. However my application doesn't produce any messages, it just consumes messages published by other application so I don't have any producers defined. I've tried to modify the application.yaml file to just add a dummy producer:

spring:
  cloud:
    stream:
      bindings:
        dummyProducer:
          producer:
            requiredGroups: group1,group2
        input1:
          bindingRoutingKey: key1.#
          binder: rabbit
          group: group1
          destination: dest-group1
          transacted: true
        input2:
          bindingRoutingKey: key2.#
          binder: rabbit
          group: group2
          destination: dest-group2
          transacted: true

But this doesn't work. So my question is:

How should I modify my application.yaml file (and possible the code if required) to make the Spring Cloud stream create the queues and bindings on startup?


Answer:

We normally only provision the exchange on the producer side; we don't provision queues unless required-groups is set.

On the consumer side, we always provision queues (and the exchange).

If that is not happening, something else is wrong; do you have @EnableBinding?

Show your application code.

Question:

So, I have 2 queues, outboundEmailQueue and storeEmailQueue:

<rabbit:queue name="outboundEmailQueue"/>
<rabbit:queue name="storeEmailQueue"/>

binded to a fanout exchange called integrationExchange:

<rabbit:fanout-exchange name="integrationExchange" auto-declare="true">
    <rabbit:bindings>
        <rabbit:binding queue="outboundEmailQueue"/>
        <rabbit:binding queue="storeEmailQueue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

the template:

<rabbit:template id="integrationRabbitTemplate"
    connection-factory="connectionFactory" exchange="integrationExchange"
    message-converter="jsonMessageConverter" return-callback="returnCallback"
    confirm-callback="confirmCallback" />

how I am sending an object to the exchange:

integrationRabbitTemplate.convertAndSend("integrationExchange", "", outboundEmail);

However, the message only gets published to storeEmailQueue:

What is wrong with my configuration? Why is the message not being queued to outboundEmailQueue?


Answer:

From the screen captures, it seems your configuration is ok and the message is reaching both queues. But the consumer configuration on each queue is not the same:

  • storeEmailQueue has consumer ack configured
  • outboundEmailQueue has autoack configured

If you have a doubt:

  • check the bindings section of either the exchange or the queues to confirm the link is there (but again, from your screen captures, seems likely to be present)
  • stop the consumers and push a message to the exchange, you should see the message ready count (and total count) increase on both queues.

Question:

I'm looking to replicate something I can see in RabbitMQ Web UI -- it will tell me what are the consumers (in the form of their channels) are currently connected to a given queue.

How can I know that information from the RabbitMQ Java API? Is it possible? The best I could find was how to know how many consumers a given queue has, but nothing better than that.

If the Java API can't provide that, is there some straight-forward and not very cpu intensive way of getting that info with RabbitMQ CLI (rabbitmqadmin)?

Thanks


Answer:

You can use this API: http://rabbitmqhost:15672/api/queues/{vhost}/{queue_name}

es:

http://localhost:15672/api/queues/%2F/test_3

the json you will get is:

{

    "name": "test_3",
    "vhost": "/",
    "durable": true,
    "auto_delete": false,
    "exclusive": false,
    "arguments": { },
    "node": "rabbit@t-srv-rabbit-cos02",
    "consumer_details": [
        {
            "arguments": { },
            "prefetch_count": 0,
            "ack_required": false,
            "exclusive": false,
            "consumer_tag": "amq.ctag-L417KXSKpmghjHXmRpcHkw",
            "queue": {
                "name": "test_3",
                "vhost": "/"
            },
            "channel_details": {
                "name": "127.0.0.1:50427 -> 127.0.0.1:5672 (5)",
                "number": 5,
                "user": "test",
                "connection_name": "127.0.0.1:50427 -> 127.0.0.1:5672",
                "peer_port": 50427,
                "peer_host": "127.0.0.1"
            }
        }
    ],

About Java you can use HOP (RabbitMQ HTTP API client for Java, Groovy, and other JVM languages)

Question:

Rabbit config:

package com.rabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.URI;
import java.net.URISyntaxException;

@EnableRabbit
@Configuration
public class RabbitMqConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

Overview of application : Whenever an gitRepository is connected to our application, the repository name becomes the exchange name, in this case ForceCI, then each branchin that repository will create its own queue, here there are two queues develop and master . Now everytime a pull request gets created in develop branch I need to pass the information to develop queue and it should be listened by specific listener which should be registered only for develop. I saw examples for dynamic queues but I cannpt seem to find any examples on how to create dynamic listeners which will execute with different threads, how can I achieve this? Also I am trying to send some messages to queue as test but I am not able to see them in console. (code below)

@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));

}

@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
        request) throws URISyntaxException {
    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);

    System.out.println("develop -> "+develop);
    if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
        for (String stringPropertyName : develop.stringPropertyNames()) {
            String property = develop.getProperty(stringPropertyName);
            System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
        }
    } else {
                    Queue queue = new Queue(branchName, true);
        String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
        rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
        System.out.println(develop1);
    }
}

@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");


}

@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
        request) throws URISyntaxException {


    Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
    String queue_name = develop.getProperty("QUEUE_NAME");

    rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");


}

UPDATE

Binding was missing, when I gave binding as shown above in code, the messages started going in, but I still cant figure out how to listen these messages in different listeners and process them in different threads?


Answer:

The simplest way is to use a DirectMessageListenerContainer and add queues to it as necessary. You won't get a new thread for each queue, though; with the direct container the listener is invoked on a thread from the amqp-client thread pool.

The direct container is efficient at adding queues; you can start with zero queues if needed. See Choosing a container for more information.

If you MUST have a new thread for each queue, you will have to manually create (and manage) a SimpleMessageListenerContainer for each.

Question:

I'm building an application using RabbitMQ/Spring/Spring AMQP and am having trouble handling the way I've laid out my queues.

Essentially I have one queue that every consumer listens to, with each message basically saying "this queue is ready to be processed by a single consumer". The consumer will then listen to the queue indicated in the message, consume all the messages in that queue, and finally delete it when done.

These short lived queues are all created on the fly as data comes in to be processed and cannot be consumed by multiple consumers (whichever gets the message in the 'ready' queue).

I'm having trouble gracefully handling the consumers in this situation. Right now I just create a new DirectMessageListenerContainer each time a consumer gets a message from the 'ready' queue and then stop it once it has gotten all the messages it needs. It seems like this solution isn't ideal. Is there any better way to handle a situation like this with Spring AMQP/RabbitMQ?


Answer:

You can add/remove queues to/from existing container(s) at runtime; it is more efficient with the direct container (see Choosing a container).

The MessageProperties has the consumerQueue property to tell you which queue the message came from.

Question:

I am trying to use header exchange in rabbitmq to produce and consume messages. When i am trying to produce the message i have to convert the message to byte[] but on the consumer side i'm not able to deserialize it properly. I had used classMapper in my jackson dependency but still it is not able to decode the class. I am not sure if i'm missing something or if i'm doing it wrong.

ProducerConfiguration class

@Configuration
public class RabbitMQConfig 
{

    @Value("${exchangeName}")
    String exchange;

    @Value("${queueName}")
    String queueName;

    @Value("${queueName1}")
    String queueName1;

    @Bean
    public List<Object> topicBindings() 
    {

        Queue headerQueue = new Queue(queueName);
        Queue headerQueue1 = new Queue(queueName1);

        Map<String,Object> mapForQueue = new HashMap<>();
        mapForQueue.put("key", "value");
        mapForQueue.put("key2", "value2");

        Map<String,Object> mapForQueue1 = new HashMap<>();
        mapForQueue.put("key", "value");
        mapForQueue1.put("key1", "value1");

        HeadersExchange headerExchange = new HeadersExchange(exchange);

        return Arrays.asList
                (
                        headerQueue,headerQueue1,headerExchange,
                        BindingBuilder.bind(headerQueue).to(headerExchange).whereAny(mapForQueue),
                        BindingBuilder.bind(headerQueue1).to(headerExchange).whereAll(mapForQueue1)
                );
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() 
    {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public DefaultClassMapper classMapper() 
    {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("String", String.class);
        idClassMapping.put("Employee", Employee.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

}

SenderClass

@Service
public class RabbitMQSender 
{
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${exchangeName}")
    private String exchange;

    public void send(Employee employee) 
    { 
        Message message = MessageBuilder.withBody(SerializationUtils.serialize(employee))
                .setHeader("key", "value")
                .setHeader("key1", "value1")
                .build();

        amqpTemplate.convertAndSend(exchange,"",message);
    }

}

ConsumerConfiguration Class

@EnableRabbit
@Configuration
public class ConsumerConfiguration
{
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() 
    {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public DefaultClassMapper classMapper() 
    {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("String", String.class);
        idClassMapping.put("Employee", Employee.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

}

ConsumerApplication

@Component
@RabbitListener(queues = "#{'${combinedQueue}'.split(';')}")
public class RabbitMqListener 
{
    @RabbitHandler
    public void recievedMessage(@Header(AmqpHeaders.CONSUMER_QUEUE) String rk,@Payload Employee employee) 
    {
        System.out.println("*********************************************************************************************");
        System.out.println("Header recieved  = "+rk);
        System.out.println("Recieved Object Message From Important Queue " + employee);
        System.out.println("*********************************************************************************************");
    }

    @RabbitHandler
    public void recievedMessage1(@Header(AmqpHeaders.CONSUMER_QUEUE) String rk,@Payload byte[] object) 
    {
        System.out.println("*********************************************************************************************");
        System.out.println("Header recieved  = "+rk);

        Employee emp = (Employee)SerializationUtils.deserialize(object);

        System.out.println("Recieved String Message From Important Queue " + emp);
        System.out.println("*********************************************************************************************");
    }

}

Employee class

public class Employee
{

    private String empName;
    private String empId;

    public String getEmpName() {
        return empName;
    }

    public void setEmpName(String empName) {
        this.empName = empName;
    }

    public String getEmpId() {
        return empId;
    }

    public void setEmpId(String empId) {
        this.empId = empId;
    }

    @Override
    public String toString() {
        return "Employee [empName=" + empName + ", empId=" + empId + "]";
    }

}

ErrorLog

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.rabbitmq.consumer.service.RabbitMqListener.recievedMessage1(java.lang.String,byte[])' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:198) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.IllegalStateException: Could not deserialize object type
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:98) ~[spring-amqp-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:75) ~[spring-amqp-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at com.rabbitmq.consumer.service.RabbitMqListener.recievedMessage1(RabbitMqListener.java:32) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:130) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:60) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190) ~[spring-rabbit-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 12 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.poc.springbootrabbitmq.model.Employee
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_191]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_191]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_191]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_191]
    at java.lang.Class.forName0(Native Method) ~[na:1.8.0_191]
    at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:686) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[na:1.8.0_191]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[na:1.8.0_191]
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:92) ~[spring-amqp-2.1.5.RELEASE.jar:2.1.5.RELEASE]
    ... 22 common frames omitted

2019-05-06 00:39:35.801  WARN 2780 --- [ntContainer#0-1] o.s.a.s.c.Jackson2JsonMessageConverter   : Could not convert incoming message with content-type [application/octet-stream], 'json' keyword missing.

Also, can i produce the message in any format other than byte[] ???


Answer:

2019-05-06 00:39:35.801 WARN 2780 --- [ntContainer#0-1] o.s.a.s.c.Jackson2JsonMessageConverter : Could not convert incoming message with content-type [application/octet-stream], 'json' keyword missing.

The Jackson converter will only convert if the content_type property contains json (e.g. application/json).

Question:

I'am stuyding a sample exemple, have one queue and 2 listeners from this queue, little question and the problem... When have much messages, supose the threads are killing yourself, when have low messages come normals

First Listener

@RabbitListener(queues = "FilaSoftware")
    public void receiver(UsuarioModel fileBody) {
        System.out.println("Mensagem Nome - " + fileBody.getNome() + " com a idade " + String.valueOf(fileBody.getIdade()));

    }

Second Listener

@RabbitListener(queues = "FilaSoftware")
    public void receiver(AlunoModel aluno) {
        System.out.println("Aluno Nome - " + String.valueOf(aluno.getNomeAluno()) + " sala - " + String.valueOf(aluno.getSalaAluno()) + " numero - " + String.valueOf(aluno.getNumeroAluno()));

    }

And return this values

Mensagem Nome - null com a idade 0
Aluno Nome - Aluno 99 sala - 99 numero - 99
Mensagem Nome - Usuario numero 75 com a idade 75
Aluno Nome - Aluno 39 sala - 39 numero - 39
Mensagem Nome - null com a idade 0
Aluno Nome - Aluno 80 sala - 80 numero - 80
Mensagem Nome - null com a idade 0
Aluno Nome - null sala - 0 numero - 0
Mensagem Nome - null com a idade 0
Aluno Nome - Aluno 40 sala - 40 numero - 40
Mensagem Nome - null com a idade 0
Aluno Nome - Aluno 8 sala - 8 numero - 8
Mensagem Nome - Usuario numero 89 com a idade 89
Aluno Nome - Aluno 67 sala - 67 numero - 67

Here my config file

 private static final String QUEUE_SOFTWARE = "FilaSoftware";

    @Bean
    Queue queueSoftware() {
        return new Queue(QUEUE_SOFTWARE, false);
    }

    @Bean
    Exchange exchangeDefault() {
        return ExchangeBuilder.topicExchange("ExchangeSoftware")
                .durable(false)
                .autoDelete()
                .build();
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queueSoftware())
                .to(exchangeDefault())
                .with("keyAPI")
                .noargs();
    }

    @Bean
    public Jackson2JsonMessageConverter listnerMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate (final ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(listnerMessageConverter());
        return rabbitTemplate;
    }

Here the publishers

@Scheduled(fixedDelay = 250L)
    public void runAluno() {
            System.out.println("Publicando mensagem aluno na mensageria");
            Random random = new Random();
            int randomNum = random.nextInt(100);
            AlunoModel alunoModel = new AlunoModel();
            alunoModel.setNomeAluno("Aluno " + String.valueOf(randomNum));
            alunoModel.setNumeroAluno(randomNum);
            alunoModel.setSalaAluno(randomNum);
            rabbitTemplate.convertAndSend("ExchangeSoftware", "keyAPI", alunoModel);
    }

    @Scheduled(fixedDelay = 1000L)
    public void runUsuario() {

            System.out.println("Publicando mensagem usuario na mensageria");
            UsuarioModel usuarioModel = new UsuarioModel();
            Random random = new Random();
            int randomNum = random.nextInt(100);
            usuarioModel.setNome("Usuario numero " + String.valueOf(randomNum));
            usuarioModel.setIdade(randomNum);
            rabbitTemplate.convertAndSend("ExchangeSoftware", "keyAPI", usuarioModel);
    }

Can give a help? :)

Note: I'am using random to generate values like 'idade (number)'


Answer:

This was added as an edit to the question by the OP. Because the question should remain a question, this is being moved to an answer. The OP should ideally accept this to mark the question as resolved.

Solved the problem with a classmapper to jackson in my consumer config

@Bean
    public Jackson2JsonMessageConverter listnerMessageConverter(){
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();

        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
        idClassMapping.put(
                "com.rabbitmq.example.AlunoModel", AlunoModel.class);
        idClassMapping.put(
                "com.rabbitmq.example.UsuarioModel", UsuarioModel.class);
        classMapper.setIdClassMapping(idClassMapping);
        converter.setClassMapper(classMapper);

        return converter;
    }

    @Bean
    public RabbitTemplate rabbitTemplate (final ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(listnerMessageConverter());
        return rabbitTemplate;
    }

Changed my listeners for this:

@Component
@RabbitListener(queues = "FilaSoftware")
public class EventFilter {

    private final static Logger logger = LoggerFactory.getLogger(EventFilter.class);

    @RabbitHandler
    public void receiver(UsuarioModel fileBody) {
        System.out.println("Mensagem Nome - " + fileBody.getNome() + " com a idade " + String.valueOf(fileBody.getIdade()));
    }

    @RabbitHandler
    public void receiver(AlunoModel aluno) {
        System.out.println("Aluno Nome - " + String.valueOf(aluno.getNomeAluno()) + " sala - " + String.valueOf(aluno.getSalaAluno()) + " numero - " + String.valueOf(aluno.getNumeroAluno()));
    }
}

And this is the result:

Mensagem Nome - Usuario numero 70 com a idade 70
Aluno Nome - Aluno 83 sala - 83 numero - 83
Aluno Nome - Aluno 56 sala - 56 numero - 56
Aluno Nome - Aluno 48 sala - 48 numero - 48
Aluno Nome - Aluno 24 sala - 24 numero - 24
Mensagem Nome - Usuario numero 7 com a idade 7
Aluno Nome - Aluno 44 sala - 44 numero - 44
Aluno Nome - Aluno 70 sala - 70 numero - 70
Aluno Nome - Aluno 55 sala - 55 numero - 55
Aluno Nome - Aluno 96 sala - 96 numero - 96 

Question:

Is it possible to bind many queues to exactly one event handler ? The key thing is that these queues will be added (binded) dynamically, first one, next two and so on. I would like to have only one event handler.

Maybe code-based created queue which is backed by n other existing queues ?


Answer:

If you use Spring AMPQ, then you should be familiar with the ListenerContainer abstraction. So, you configure your single listener (handler in your terms) and let the container to manage queues and other connection and lifecycle options.

You can find all the required options in the Reference Manual. Also see Listener Container Queues:

See methods addQueues, addQueueNames, removeQueues and removeQueueNames

More info in the: Dynamically add new queues, bindings and exchanges as beans

Question:

I am using the firehose tracer in RabbitMQ.

For this example, I have a queue called Calculator bound to a headers exchange.

To read the logs using a Java client, I am using the HTTP API to read the log files using this code :

URL url = new URL("http://127.0.0.1:15672/api/trace-files/calculator.log");

HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setRequestMethod("GET");

String userpass = "guest" + ":" + "guest";
String basicAuth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(userpass.getBytes());

connection.setRequestProperty("Authorization", basicAuth);
connection.connect();

BufferedReader buff = new BufferedReader(new InputStreamReader(connection.getInputStream()));

Problem is, I want to delete the logs right after I read them. If I delete the log file located in /var/tmp/rabbitmq/, The tracing won't work anymore.

Is there a way to delete the log files contents without affecting the tracing process?


Answer:

It actually works using

sudo bash -c "> /var/tmp/rabbitmq-tracing/mylogs.log"

Don't now why it didn't work when I deleted it with gedit as a super-user.

Question:

We have the following scenario. Queue: Q1

Q1 binds to Exchange1 and Exchange2.

Message A published to both Exchange1 and Exchange2.

We have Q1 Consumer defined. How will be the behavior in this case. Does the message consumed twice?


Answer:

Does the message consumed twice?

Yes.

If you publish the message to both Exchange 1 and Exchange 2, and both of these exchanges push the message to Q1, then you consumer will have 2 messages to consume.

Question:

I'm using the Java client with RabbitMQ. I've seen references to finding queue size with a Spring plugin, but I'm not using Spring. Is there a non-Spring way to get the size of a queue given its name? Right now I'm just exec'ing shell commands 'rabbitmqctl list_queues' and parsing the results--not great.


Answer:

you can enable the http management plugin,

rabbitmq-plugins enable rabbitmq_management

then use the http API.

If you execute an http call like:

http://your_server/api/queues/your_virtual_host/yourqueue

you have all information about the queue, the output is a json like:

  ... "backing_queue_status": {
    "q1": 0,
    "q2": 0,
    "delta": [
      "delta",
      0,
      0,
      0
    ],
    "q3": 0,
    "q4": 0,
    "len": 0,
    "target_ram_count": 0,
    "next_seq_id": 0,
    "avg_ingress_rate": 0,
    "avg_egress_rate": 0,
    "avg_ack_ingress_rate": 0,
    "avg_ack_egress_rate": 0,
    "mirror_seen": 0,
    "mirror_senders": 0
  },
  "incoming": [],
  "deliveries": [],
  "consumer_details": [],
  "name": "myqueue",
  "vhost": "vhost",
  "durable": true,
  "auto_delete": false,
  "arguments": {},
  "node": "rabbit@lemur01"
}

Question:

In rabbitMQ - Queue max priority

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
Queue queue = new Queue("myQueue", true, false, false, args);

But when I am producing with message with priority 11, 10, 2. I am getting message with priority 11 also.


Answer:

Messages without a priority property are treated as if their priority were 0. Messages with a priority which is higher than the queue's maximum are treated as if they were published with the maximum priority.

Rabbit MQ priority reference