Hot questions for Using RabbitMQ in tomcat

Question:

I am creating a REST api to send message to RabbitMQ and was trying to understand what are the best practice for creating/closing channels. I am using RabbitMQ Java client api.

Currently I have a class RabbitMQPublisherConnection where I spring inject RabbitMQ connection. This class is then spring injected to another class RabbitMQPublisherChannel. This class has the following function to create a channel:

public class RabbitMQPublisherChannel {

public Channel createChannel(String amqpExchange,
                                         String exchangeType,
                                         String queue,
                                         String routingKey,
                                         boolean durableExchange,
                                         boolean durableQueue,
                                         boolean autoDelete,
                                         com.rabbitmq.client.Connection connection) throws IOException {
        Channel channel = null;
        channel = connection.createChannel();

        if ((amqpExchange != null) && !"".equals(amqpExchange.trim())) {
            if (log.isLoggable(Level.FINEST)) {
                log.finest("exchange:" + amqpExchange + ", type: " + exchangeType + ", durableExchange: " + durableExchange);
            }
            channel.exchangeDeclare(amqpExchange, exchangeType, durableExchange);
            channel.queueDeclare(queue, durableQueue, false, autoDelete, null);
            channel.queueBind(queue, amqpExchange, routingKey);
        }
        return channel;
    } }

Now I have the third class RabbitMQPublisher where I spring inject RabbitMQPublisherChannel class. My application context looks like this:

<bean id="rabbitMQPublisher" class="com.rabbitmq.RabbitMQPublisher">
    <property name="publisherChannel"     ref="rabbitMQPublisherChannel"/>
</bean>

<bean id="rabbitMQPublisherChannel" class="com.rabbitmq.RabbitMQPublisherChannel">
    <property name="publisherConnection"     ref="rabbitMQPublisherConnection"/>
</bean>

<bean id="rabbitMQPublisherConnection" class="com.rabbitmq.RabbitMQPublisherConnection">
    <property name="rabbitMQConnectionSettingMap">
        .. connection ..
    </property>
</bean>

The class RabbitMQPublisher has the function to publish a message to RabbitMQ:

public boolean publishMessage(String message, String queueName){

    try {
        // Validate queue name
        com.rabbitmq.client.Channel channel     = publisherChannel.getRabbitMQChannel(queueName);
        RabbitMQConnection          settings    = publisherChannel.getPublisherConnection().getRabbitMQConnectionSettingMap().get(queueName);

        if (channel != null) {
            channel.basicPublish(settings.getAmqpExchange(), settings.getAmqpRoutingKey(), null, message.getBytes());
            publisherChannel.closeChannel(channel);
        }
    } catch (AlreadyClosedException e) {
        return FAILURE_RESPONSE;
    } catch (IOException e) {
        return FAILURE_RESPONSE;
    }
    return SUCCESS_RESPONSE;
}

This application is run through tomcat and I noticed with AppDynamics that the closing the channel takes like 47% of the total time taken to publish message. When I remove the call to close the channel then I save this 47% of time which is like 32ms but then I notice in my RabbitMQ management console that the number of channel is ever increasing for that connection.

So my questions are -

  1. Is this a good practice to open & close channel after every publish assuming that tomcat will get multiple requests per second?
  2. Is this a good practice to have a channel pool shared among multiple threads (which RabbitMQ recommends but also says Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.) Does this mean create a new channel for every thread?
  3. Will it be a good practice to not close channel and through RabbitMQ http api cleanup channels which are idle. (Please don't recommend this)?
  4. Is it worth saving 32ms?

Thanks


Answer:

Since you are a Spring Framework user, consider using Spring AMQP. The RabbitTemplate uses cached channels over a single connection with the channel being checked out of the cache (and returned) for each operation. The default cache size is 1, so it generally needs to be configured for an environment like yours.

Question:

here is the implementation I have

Consume:

GetResponse resp = channel.basicGet(qName, false);

Acknowledge:

channel.basicAck(dTag, false);

or

channel.basicNack(dTag, false,true);

The problem I am facing that is, if I create different channel objects, the acknowledge is not happening. And I can only implement that if I am doing both ( consume, acknowledge ) from same jvm( tomcat or any other app server ).

so if my application is running in a clustered mode( multiple application server ) , I am in trouble .

Can anyone please help me how can I acknowledge from a different machine the message was consumed, i.e. using a different channelobject .

Thanks !!


Answer:

Can anyone please help me how can I acknowledge from a different machine the message was consumed, i.e. using a different channelobject .

you can't.

acknowledgement must happen on the same channel.

Question:

I'm setting a DirectMessageListenerContainer for my messaging application. I am working with Tomcat, Spring, AMQP.

The docs have this warning https://docs.spring.io/spring-amqp/reference/#threading

With the DirectMessageListenerContainer, you need to ensure that the connection factory is configured with a task executor that had sufficient threads to support your desired concurrency across all listener containers that use that factory. The default pool size is only five.

In my application, I haven't configured any executor so I am using the default. The reason I have not done so is because I am operating over tomcat which is set with 10000 max threads. Am I going to end up bottlenecked with a pool size of five?


Answer:

The listener threads have nothing to do with tomcat threads. I just looked at the latest amqp-client (the RabbitMQ java client that Spring AMQP uses) and the default number of threads is now...

DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;

Whether that is enough for your application entirely depends on your application, how long it takes to process a message, and how much concurrency you configure (consumersPerQueue).