Hot questions for Using RabbitMQ in multithreading

Top Java Programmings / RabbitMQ / multithreading

Question:

in this guide https://www.rabbitmq.com/api-guide.html RabbitMQ guys state:

Channels and Concurrency Considerations (Thread Safety)

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with * Publisher Confirms.

Thread safety is very important so I tried to be as diligent as possible, but here's the problem:

I have this application that receives messages from Rabbit. When a message is received, it processes it and then acks when it's done. The application can process just 2 items at the same time in a fixed thread pool with 2 threads. The QOS prefetch for Rabbit is set to 2, because I don't want to feed the app with more than it can handle in a time frame.

Now, my consumer's handleDelivery does the following:

Task run = new Task(JSON.parse(message));    
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));

At this point, you already figured out that TestWrapperThread does the channel.basicAck(deliveryTag, false); call as last operation.

By my understanding of the documentation, this is incorrect and potentially harmful because channel is not thread safe and this behavior could screw things up. But how I am supposed to do then? I mean, I have a few ideas but they would def make everything more complex and I'd like to figure it out if it's really necessary or not.

Thanks in advance


Answer:

I suppose you are using Channel only for your consumer and not for other operations like publish etc..

In your case the only potential problem is here:

channel.basicAck(deliveryTag, false);

because you call this across two thread, btw this operation is safe, if you see the java code:

the class ChannelN.java calls:

public void basicAck(long deliveryTag, boolean multiple)
   throws IOException
{
   transmit(new Basic.Ack(deliveryTag, multiple));
}

see github code for ChannelN.java

the transmit method inside AMQChannel uses:

public void transmit(Method m) throws IOException {
   synchronized (_channelMutex) {
       transmit(new AMQCommand(m));
   }
}

_channelMutex is a protected final Object _channelMutex = new Object();

created with the class. see github code for AMQChannel.java

EDIT

As you can read on the official documentation, "some" operations are thread-safe, now it is not clear which ones. I studied the code, an I think there are not problems to call the ACK across more threads.

Hope it helps.

EDIT2 I add also Nicolas's comment:

Note that consuming (basicConsume) and acking from more than one thread is a common rabbitmq pattern that is already used by the java client.

So you can use it safe.

Question:

I have a certain resource that can be used by two types of tasks: Normal Task which is run by many different threads simultaneously and Special Task which is run rarely by a single thread.

My objectives are:

  1. All the Normal Tasks should be able to access this resource normally except if the Special Task is running.
  2. The Special Task should also wait until this resource is free (from any Normal Task that might be running).
  3. If the Special Task is not running, Normal Tasks should be able to use this resource simultaneously (it is thread safe).

To be precise, I have one Rabbit-MQ queue that is accessed by this resource to pop messages. Any of the users calling my webservice can use this resource (pop a message) simultaneously. However, I have a special function that purges the queue and refills it with messages from the DB.

My challenge is locking the object only with respect to this special task but at the same time allowing normal tasks to use it concurrently.


Answer:

Your objectives sound exactly like the use case of a ReadWriteLock, which allows only one writer, but arbitrary readers if there is no writer. The Special Task can take a write lock, all other tasks a read lock.

Question:

I wrote a code that guarantees the delivery of messages and their processing. But it works in one thread. How to refactor code so that it works in parallel threads or asynchronously? In this case, messages must be guaranteed to be delivered even if the application crashes. They will be delivered after a new start of the application or with the help of other running instances of this application.

Producer:

@Async("threadPoolTaskExecutor")
@EventListener(condition = "#event.queue")
public void start(GenericSpringEvent<RenderQueueObject> event) {
    RenderQueueObject renderQueueObject = event.getWhat();
    send(RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
    try {
        log.info("SEND message");
        rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
    } catch (JsonProcessingException e) {
        log.warn("Can't send event!", e);
    }
}

Consumer

@Slf4j
@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {

    private final ApplicationEventPublisher eventPublisher;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
                                             exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
                                             key = "render.#")
    )
    public void onMessage(Message message, Channel channel) {
        String routingKey = parseRoutingKey(message);

        log.debug(String.format("Event %s", routingKey));

        RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
        handleMessage(queueObject);
    }
    public void handleMessage(RenderQueueObject render) {
        GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
        springEvent.setRender(true);
        eventPublisher.publishEvent(springEvent);
    }
}

public class Exchanges {
    public static final String EXC_RENDER_NAME = "render.exchange.topic";
    public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}

public class Queues {
    public static final String RENDER_NAME = "render.queue.topic";
    public static final Queue RENDER = new Queue(RENDER_NAME);
}

And so my message is processed. If I add @Async, then there will be parallel processing, but if the application crashes, then at a new start, messages will not be sent again.

@EventListener(condition = "#event.render")
public void startRender(GenericSpringEvent<RenderQueueObject> event) {
    RenderQueueObject render = event.getWhat();
    storageService.updateDocument(
            render.getGuid(),
            new Document("$set", new Document("dateStartRendering", new Date()).append("status", State.rendering.toString()))
    );
    Future<RenderWorkObject> submit = taskExecutor.submit(new RenderExecutor(render));
    try {
        completeResult(submit);
    } catch (IOException | ExecutionException | InterruptedException e) {
        log.info("Error when complete results after invoke executors");
    }
}

private void completeResult(Future<RenderWorkObject> renderFuture) throws IOException, ExecutionException, InterruptedException {
    RenderWorkObject renderWorkObject = renderFuture.get();
    State currentState = renderWorkObject.getState();
    if (Stream.of(result, error, cancel).anyMatch(isEqual(currentState))) {
       storageService.updateDocument(renderWorkObject.getGuidJob(), new Document("$set", toUpdate));
    }
}

I tried to customize the configuration to fit my needs. But it didn’t work:

@Bean
Queue queue() {
    return Queues.RENDER;
}

@Bean
TopicExchange exchange() {
    return Exchanges.EXC_RENDER;
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(Queues.RENDER_NAME);
}

@Bean
public RabbitTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    return template;
}
@Bean
public SimpleMessageListenerContainer container(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory, RabbitEventListener listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames(Queues.RENDER_NAME);
    container.setQueues(Queues.RENDER);
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(20);
    container.setConcurrentConsumers(10);
    container.setPrefetchCount(1000);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}
@Bean
public ConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses("127.0.0.1:5672");
    cf.setUsername("guest");
    cf.setPassword("guest");
    cf.setVirtualHost("/");
    cf.setPublisherConfirms(true);
    cf.setPublisherReturns(true);
    cf.setChannelCacheSize(25);

    ExecutorService es = Executors.newFixedThreadPool(20);
    cf.setExecutor(es);
    return cf;
}

I would be grateful for any idea


Answer:

I think I found a solution. I changed the RenderRabbitEventListener so that it again sent the message to the queue if the message was received from Rabbit in case of crash. Thanks to this, my consumer will always work in parallel. This will work in parallel in the event of a failure of all nodes, as well as in the event of a failure of one node.

Here are the changes I made:

@RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
        exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
        key = "render.#")
)
public void onMessage(Message message, Channel channel,
                      @Header(AmqpHeaders.DELIVERY_TAG) long tag
) {
    RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
    if (message.getMessageProperties().isRedelivered()) {
        log.info("Message Redelivered, try also");
        try {
            channel.basicAck(tag, false);
            MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
            String valueAsString = parseBody(message);
            Message copyMessage = messageConverter.toMessage(valueAsString, new MessageProperties());
            rabbitTemplate.convertAndSend(
                    message.getMessageProperties().getReceivedRoutingKey(),
                    copyMessage);
            return;
        } catch (IOException e) {
            log.info("basicAck exception");
        }
    }
    log.info("message not redelievered");
    String routingKey = parseRoutingKey(message);
    log.debug(String.format("Event %s", routingKey));
    handleMessage(queueObject);
}