Hot questions for Using RabbitMQ in messaging

Question:

Now I can do like this:

@RabbitListener(queues = {ENTITY_KEY + "-snapshots",  ENTITY_KEY + "-updates"})
public void handleMessage(ProviderOddsOffer offer, @Header("update_type") Long updateType) {
    ...
}

Can I do it without declaring queues in annotation itself?


Answer:

It's not clear what you mean; the listener has to be configured to consume from some queue, or queues.

If you that mean you wish to externalize the queue name(s) rather than hard-coding in java, you can use a property placeholder ${...} or a SpEL expression #{...} for the queue name(s); they will be resolved during bean initialization.

Question:

I think I have a problem understanding spring cloud messaging and can't find an answer to a "problem" I'm facing.

I have the following setup (using spring-boot 2.0.3.RELEASE).

application.yml

spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtual-host: /
    cloud:
      stream:
        bindings:
          input:
            destination: foo
            group: fooGroup
          fooChannel:
            destination: foo

Service class

@Autowired
FoodOrderController foodOrderController;

@Bean
public CommandLineRunner runner() {
    return (String[] args) -> {
       IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
    };
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

FoodOrderController

public class FoodOrderController {

    @Autowired
    FoodOrderSource foodOrderSource;

    public String orderFood(){
        var foodOrder = new FoodOrder();
        foodOrder.setCustomerAddress(UUID.randomUUID().toString());
        foodOrder.setOrderDescription(UUID.randomUUID().toString());
        foodOrder.setRestaurant("foo");
        foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
       // System.out.println(foodOrder.toString());
        return "food ordered!";
    }
}

FoodOrderSource

public interface FoodOrderSource {
    String INPUT = "foo";
    String OUTPUT = "fooChannel";

    @Input("foo")
    SubscribableChannel foo();
    @Output("fooChannel")
    MessageChannel foodOrders();
}

FoodOrderPublisher

@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}

The setup is working, with the exception that both StreamListener receive the same messages. So everything get's logged twice. Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message. I know that the example above is not sensible, but I want to mimic a multi-node environment with multiple listeners setup.

Why is the message received by both listeners? And how can I make sure that a message is only received once within a setup group?

According to the documentation, messages should also be auto-acknowledged by default, but I can't find anything that indicates that the messages actually get acknowledged. Am I missing something here?

Here's some screenshots of rabbit admin


Answer:

Reading the documentation, it says specifying a group inside the queues bindings, both the listeners will be registered inside the group and only one listener will receive a single message.

That is true when the listeners are in different application instances. When there are multiple listeners in the same instance they all get the same message. This is typically used with a condition where each listener can express interest in which meals they are interested in. Documented here.

Basically, the competing consumer is the binding itself which dispatches the message to the actual @StreamListeners in the application.

So, you can't "mimic a multi-node environment with multiple listeners setup" this way.

but I can't find anything that indicates that the messages actually get acknowledged

What do you mean by that? If the message is processed successfully, the container acks the message and it is removed from the queue.

Question:

I am configuring an example to check if RabbitMQ can solve my problem and I am running to a problem.

The task: I have 3 machines behind gateway and only one of them has the necessary data to process the message. This I want to solve with condition check.

The problem: I tried to configure a few listeners to emulate the situation as if a few machines consume the message.

@RabbitListener(queues = "spring-boot")
public void receiveMessage1(String message) {
    if(canProcess()){
        System.out.println("Received 1 <" + message + ">");
    }
}

@RabbitListener(queues = "spring-boot")
public void receiveMessage2(String message) {
    if(canProcess()){
        System.out.println("Received 2 <" + message + ">");
    }
}

However, only one random listener processes the message. Others just don't get it. I there a way all listeners consume it and I myself decide which one processes it by "canProcess" method?


Answer:

Other listeners doesnt process your message because it is consumed properly by one of them. After receivemessage method is end without errors rabbit listener internally marks your message as processed. It doesnt know your flow. So work around for this could be throwing an Exception when message cant be proccesed, but its ugly fix - its not very efficient way of handling messages.

I found this article which covers this type of architecture https://derickbailey.com/2015/07/22/airport-baggage-claims-selective-consumers-and-rabbitmq-anti-patterns/

Question:

I am developing a multithreaded application, where several "processors" (Runnables in ThreadPools) send messages to each other. They communicate using BlockingQueue interface: when processor A is done with task T1, it pushes it to queue Q1 (for example, BlockingQueue<MyTask> if T1 is represented by class MyTask); after that, processor B pulls task from Q1, performs computations and pushes result result in Q2; and so on.

I use LinkedBlockingQueue, because my application is monolithic and all processors "live" in the same JVM. However, I want my application to become modular (Microservice Architecture), so I decided to use RabbitMQ as a message broker.

The problem is to migrate from java implementations of the queues to RabbitMQ with minimal changes in client's source code. Thereby, I try to find some kind of binding between RabbitMQ abstractions and BlockingQueue interface. So, when somebody sends a message to amqp's queue, it should appear in a java queue. And vice versa: when somebody pushes an object to the java queue, it should be propagated to an amqp's exchange.

An example implementation of the polling (from amqp's queue, using spring-amqp) is presented below.

<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
    LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

    MessageConverter messageConverter = listenerContainer.getMessageConverter();
    listenerContainer.setupMessageListener((MessageListener) message -> {
        Object task = messageConverter.fromMessage(message);
        queue.offer(elementType.cast(task));
    });

    return queue;
}

I cannot find a framework that implements BlockingQueue interface using RabbitMQ's queues by now. If that kind of framework doesn't exist, is my idea architecturally wrong in some way, or just nobody have not implemented this yet?


Answer:

I am not sure you really want to do it the way you describe - the inbound messages will be delivered to the queue and sit in memory, not in RabbitMQ.

I think a simple BlockingQueue implementation that uses a RabbitTemplate underneath to pull messages from the rabbit queue (using receive(), or receiveAndConvert()) might be better for take/poll operations - it will leave the message in RabbitMQ until needed, and simply RabbitTemplate.convertAndSend() for offer/put operations.

While pretty simple, it might be a useful addition to the framework; consider contributing.

Question:

I'm new to RabbitMQ and want to implement asynchronous messaging of SAGA with RabbitMQ.So I used RPC example of RabbitMQ to do the task. I've one orchestrator ( RPCClient) and multiple microservices ( RPCServer). Orchestrator uses unique queues to command microservices.And each microservice uses a common queue ( Reply_ Queue) to reply orchestrator. To keep log I want to get notifications in orchestrator side, when any microservice is down for any configurable time. I read about consumer cancellation,but it only works when I delete the queue.How to get notifications in JAVA with keeping queue messages? And is it correct way to implement saga asynchronous messaging?


Answer:

To implement a reliable RPC is hard, I can't give a detail guide about how to do this. If we ignore same special failure situation, I can give a simple workaround:

First, we assume that RPCClient never fail, RPCServer may fail anytime.

RPCClient need to know which request is timeout, so it can send request message with a TTL. After RPCServer receive request message and send response message, it should ACK the request message.

If RPCServer:

  • has failed before consume request message OR
  • has failed before send response message

The request message will be republish to Dead Letter Exchange, so RPCClient can consume to some queue binded with that exchange, it can know which request is timeout.

Question:

I am working on a Spring Hibernate project with Spring AMQP RabbitMQ implementation for messaging. The rabbitmq configuration is in a separate xml which gets imported into the root application context. The rabbitmq listener receiver polls the queue every second. The important log messages get buried under the truckloads of DEBUG-level polling messages being dumped into the log file.

2015-10-11 18:12:02.0031 DEBUG SimpleAsyncTaskExecutor-1 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer – Retrieving delivery for Consumer: tags=[[amq.ctag-p4K9s4EoXAbxKWufSzX_-w]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
2015-10-11 18:12:03.0032 DEBUG SimpleAsyncTaskExecutor-1 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer – Retrieving delivery for Consumer: tags=[[amq.ctag-p4K9s4EoXAbxKWufSzX_-w]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0

Is there a way to separate out this polling logging into a different file or stop it from polluting the logs?

I am using log4j for logging; Configuration below.

# Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=C:\\Users\\xxx\\xxxx.log
log4j.appender.file.MaxFileSize=2MB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSSS} %p %t %c \u2013 %m%n


# Root logger option
log4j.rootLogger=DEBUG, file, stdout

Answer:

add this statements (I omitted the full configuration for the second file logger file2)

log4j.appender.file2=org.apache.log4j.RollingFileAppender
log4j.appender.file2.File=C:\\Users\\xxx\\other.log
...

log4j.logger.org.springframework.amqp.rabbit.listener.BlockingQueueConsumer=DEBUG, file2
log4j.additivity.org.springframework.amqp.rabbit.listener.BlockingQueueConsumer=false

this will put the log entries from org.springframework.amqp.rabbit.listener.BlockingQueueConsumer to an other log file.


If you do not want to have this logs at all, then use

log4j.logger.org.springframework.amqp.rabbit.listener.BlockingQueueConsumer=WARN

instead.

@see: log4j: Log output of a specific class to a specific appender

Question:

How to send file with Java RabbitMQ? Especially using message converter.

I'm using Spring Framework, can send String or ArrayList but can't send File. I'm only use convertAndSend and convertAndReceive to send File but get :

org.springframework.amqp.AmqpIOException: java.io.FileNotFoundException

I don't know how to use message converter. The code from here and change some class :

HelloWorldHandler.java

package org.springframework.amqp.helloworld.async;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;

import org.springframework.amqp.core.Message;

public class HelloWorldHandler {

    public void handleMessage(File message) throws IOException {
        BufferedReader br = new BufferedReader(new FileReader(message));
        System.out.println(br.readLine());
    }
}

ProducerConfiguration.java

package org.springframework.amqp.helloworld.async;

import java.io.File;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;

@Configuration
public class ProducerConfiguration {

    protected final String helloWorldQueueName = "hello.world.queue";

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.helloWorldQueueName);
        return template;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("x.x.x.x");
        connectionFactory.setUsername("username");
        connectionFactory.setPassword("password");
        return connectionFactory;
    }

    @Bean
    public ScheduledProducer scheduledProducer() {
        return new ScheduledProducer();
    }

    @Bean
    public BeanPostProcessor postProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }


    static class ScheduledProducer {

        @Autowired
        private volatile RabbitTemplate rabbitTemplate;

        private final AtomicInteger counter = new AtomicInteger();

        @Scheduled(fixedRate = 3000)
        public void sendMessage() {
            rabbitTemplate.convertAndSend(new File("test.txt"));
        }
    }
}

Answer:

You can convert the file content into byte array and send the byte[] as below.

byte[] fileData = // get content from file as byte[] Refer Here String fileType = // get file type from file

Message message = MessageBuilder.withBody(fileData).setHeader("ContentType", fileType).build();

rabbitTemplate.send("exchnage name", "routing key", message);

Question:

I need to write functional tests flows that involve interaction with RabbitMq. But once the tests are run I will have to clear any existing message in the queue. Since RabbitMq is persistent I need some in memory substitute for RabbitMq. Just like the way we have HSQL for databases.

I have tried using qpid broker but with no luck.

I am using spring boot framework. So I just need to inject the bean of the inmemory queue instead of actual rabbit mq.


Answer:

Take a look at testcontainers. Running a RabbitMQ Docker image in such a test is very easy. It will be restarted for every test class or method, depending on how you use it.

This will start a container running the rabbitmq:3.7 Docker image for the test class.

public class AmqpReceiveServiceIntegrationTest {

  @ClassRule
  public static GenericContainer rabbitmqContainer =
    new GenericContainer<>("rabbitmq:3.7").withExposedPorts(5672);

  static ConnectionFactory factory;
  static Connection connection;
  static Channel sendChannel;

  @BeforeClass
  public static void beforeClass() throws IOException, TimeoutException {
    factory = new ConnectionFactory();
    factory.setHost(rabbitmqContainer.getContainerIpAddress());
    factory.setPort(rabbitmqContainer.getFirstMappedPort());

    connection = factory.newConnection();

    sendChannel = connection.createChannel();
    sendChannel.queueDeclare("hello", false, false, false, null);
  }

  @Test
  public void sendIsOk() {
    sendChannel.basicPublish("", "hello", null, "Hello World!.getBytes()); 

    // assertions ...
  }
}