Hot questions for Using RabbitMQ in client

Question:

I'm building a webchat with Spring Boot, RabbitMQ and WebSocket as POC, but I'm stucked a the last point: WebSockets I want my ws clients to connect to a specific endpoint, like /room/{id} and when a new message arrives, I want the server to send the response to clients, but I searched for something similar and didn't found.

Currently, when the message arrives, I process it with RabbitMQ, like

container.setMessageListener(new MessageListenerAdapter(){
            @Override
            public void onMessage(org.springframework.amqp.core.Message message, Channel channel) throws Exception {
                log.info(message);
                log.info("Got: "+ new String(message.getBody()));
            }
        });

what I would like is, instead log it , I want to send it to the client, for example: websocketManager.sendMessage(new String(message.getBody()))


Answer:

Ok, I think I got it, for everyone who needs it, here is the answer:

first, you need to add WS dependencies to the pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
</dependency>

create a WS endpoint

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // the endpoint for websocket connections
        registry.addEndpoint("/stomp").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/");

        // use the /app prefix for others
        config.setApplicationDestinationPrefixes("/app");
    }

}

Note: I'm using STOMP, so the clients should connect like this

<script type="text/javascript">
    $(document).ready(function() {
        var messageList = $("#messages");
        // defined a connection to a new socket endpoint
        var socket = new SockJS('/stomp');
        var stompClient = Stomp.over(socket);
        stompClient.connect({ }, function(frame) {
            // subscribe to the /topic/message endpoint
            stompClient.subscribe("/room.2", function(data) {
                var message = data.body;
                messageList.append("<li>" + message + "</li>");
            });

        });
    });
</script>

Then, you can simply wire the ws messenger on your components with

@Autowired
private SimpMessagingTemplate webSocket;

and send the message with

webSocket.convertAndSend(channel, new String(message.getBody()));

Question:

I have a generic standalone JMS application which works with following JMS providers WebSphere, HornetQ and ActiveMq. I pass Context.INITIAL_CONTEXT_FACTORY and Context.PROVIDER_URL as parameters to my application and create a naming context out of them by doing something like this

Properties environmentParameters = new Properties();
environmentParameters.put(Context.INITIAL_CONTEXT_FACTORY, property.context);
environmentParameters.put(Context.PROVIDER_URL, property.provider);
namingContext = new InitialContext(environmentParameters);

And use this context for object lookup.

I understand RabbitMQ isn't a JMS provider so it doesn't have an InitialContext class or a Provider URL but it provides a JMS Client which is an abstraction of its Java client conforming to JMS specification. RabbitMQ's JMS client documentation has an example of defining objects in JNDI as a resource configuration as part of a web application but I quite couldn't figure out how to do something similar for my standalone application which creates a naming context based on JNDI provider using JMS client's dependencies or to create an InitialContext out of the available dependencies.

So can someone throw some light on how this can be done? Hope my question is clear.


Answer:

In order to get JMS working with RabbitMQ, you have to enable the plugin rabbitmq_jms_topic_exchange. You can download it following the directions in this site (You'll need to login): https://my.vmware.com/web/vmware/details?downloadGroup=VFRMQ_JMS_105&productId=349

  1. After extraction, put the file rjms-topic-selector-1.0.5.ez inside the Folder $RABBITMQ_SERVER\plugins.
  2. Enable the plugin with the command: rabbitmq-plugins enable rabbitmq_jms_topic_exchange
  3. Check if the plugin it's running ok with the command: rabbitmq-plugins list
  4. Restart the RabbitMQ - I'm not sure if it's really necessary, but just in case ;-)
  5. At your RabbitMQ web management (http://localhost:15672/#/exchanges) you can check the new Exchange you have available:
  6. Now, in theory :-), you're already able to connect to your RabbiMQ server using the standard Java JMS API.

Bear in mind that you'll have to create a .bindings file in order to JNDI found your registered objects. This is an example of the content of it:


    ConnectionFactory/ClassName=com.rabbitmq.jms.admin.RMQConnectionFactory
    ConnectionFactory/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory
    ConnectionFactory/RefAddr/0/Content=jms/ConnectionFactory
    ConnectionFactory/RefAddr/0/Type=name
    ConnectionFactory/RefAddr/0/Encoding=String
    ConnectionFactory/RefAddr/1/Content=javax.jms.ConnectionFactory
    ConnectionFactory/RefAddr/1/Type=type
    ConnectionFactory/RefAddr/1/Encoding=String
    ConnectionFactory/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
    ConnectionFactory/RefAddr/2/Type=factory
    ConnectionFactory/RefAddr/2/Encoding=String
    # Change this line accordingly if the broker is not at localhost
    ConnectionFactory/RefAddr/3/Content=localhost
    ConnectionFactory/RefAddr/3/Type=host
    ConnectionFactory/RefAddr/3/Encoding=String
    # HELLO Queue 
    HELLO/ClassName=com.rabbitmq.jms.admin.RMQDestination
    HELLO/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory
    HELLO/RefAddr/0/Content=jms/Queue
    HELLO/RefAddr/0/Type=name
    HELLO/RefAddr/0/Encoding=String
    HELLO/RefAddr/1/Content=javax.jms.Queue
    HELLO/RefAddr/1/Type=type
    HELLO/RefAddr/1/Encoding=String
    HELLO/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
    HELLO/RefAddr/2/Type=factory
    HELLO/RefAddr/2/Encoding=String
    HELLO/RefAddr/3/Content=HELLO
    HELLO/RefAddr/3/Type=destinationName
    HELLO/RefAddr/3/Encoding=String

And then... finally... the code:


    Properties environmentParameters = new Properties();
    environmentParameters.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
    environmentParameters.put(Context.PROVIDER_URL, "file:/C:/rabbitmq-bindings");
    namingContext = new InitialContext(environmentParameters);

    ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup("ConnectionFactory");

Question:

My application consumes some messages from RabbitMQ and processes them. I have about 10 queues and each queue has up to ten consumers (threads). I have a prefetch of 5. I'm running my setup in Heroku using the CloudAMQP plugin (RabbitMQ as a service).

I'm running with the default heartbeat and connection timeout settings (60 seconds).

My java application is a spring boot application using the spring-rabbit library.

Versions:

RabbitMQ 3.5.3 
Erlang 17.5.3
Java 1.8
Spring boot 1.3.2.RELEASE
Spring rabbit 1.5.3.RELEASE

The problem is that for the consumers of one particular queue stop consuming messages after some time. When I restart my java application everything works fine. The other queues are being consumed normally though. No errors on the application's side. On the log stream of rabbit's side I see some entries like

= REPORT==== 2016-08-02 15:53:32 UTC ===
closing AMQP connection <SOMETHING> (SOMETHING_ELSE -> SOMETHING_ELSE_ELSE):
{heartbeat_timeout,running}

I can't reproduce locally or in a testing environment in Heroku.

Update

The code below can be found in AMQConnection.class

int heartbeat = negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());


private static int negotiatedMaxValue(int clientValue, int serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValue, serverValue) :
            Math.min(clientValue, serverValue);
}

I can't increase the value of the heartbeat above 60 seconds (which is what I'm getting from the server).


Answer:

Unfortunately, this seems like a networking issue. This could be due to a few things:

  • The CloudAMQP service is having some issues and is killing your connection (not likely since your other consumers are working OK).
  • Your CloudAMQP plan doesn't allow for as many concurrent connections as you want. Have you checked to ensure you are on a sufficiently high enough plan to support all of your consumers? https://elements.heroku.com/addons/cloudamqp
  • Your Heroku dyno with the consumer in question is restarting, which is dropping your connection. Heroku dynos periodically restart. If your dynos can't gracefully restart, you might want to investigate why.
  • One of your Heroku dynos is having network issues (in which case it will likely be restarted on its own without your intervention).

One way to force all of your dynos to restart is to run $ heroku ps:restart. This will force Heroku to restart your dynos, which frequently means moving them to a new EC2 host. This may help if it is a one off issue.

Question:

I want to fetch several messages, handle them and ack them all together after that. So basically I receive a message, put it in some queue and continue receiving messages from rabbit. Different thread will monitor this queue with received messages and process them when amount is sufficient. All I've been able to found about ack contains examples only for one message which processed on the same thread. Like this(from official docs):

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

And also documentation says this:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire.

So I'm confused here. If I'm acking some message and at the same time the channel is receiving another message from rabbit, is it considered to be two operations at the time? It seems to me like yes.

I've tried to acknowledge message on the same channel from different thread and it seems to work, but documentation says that I should not share channels between threads. So I've tried to do acknowledgment on different thread with different channel, but it fails, because delivery tag is unknown for this channel.

Is it possible to acknowledge message not on the same thread it was received?

UPD Example piece of code of what I want. It's in scala, but I think it's straightforward.

 case class AmqpMessage(envelope: Envelope, msgBody: String)

    val queue = new ArrayBlockingQueue[AmqpMessage](100)

    val consumeChannel = connection.createChannel()
    consumeChannel.queueDeclare(queueName, true, false, true, null)
    consumeChannel.basicConsume(queueName, false, new DefaultConsumer(consumeChannel) {
      override def handleDelivery(consumerTag: String,
                                  envelope: Envelope,
                                  properties: BasicProperties,
                                  body: Array[Byte]): Unit = {
        queue.put(new AmqpMessage(envelope, new String(body)))
      }
    })

    Future {
      // this is different thread
      val channel = connection.createChannel()
      while (true) {
        try {
          val amqpMessage = queue.take()
          channel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // doesn't work
          consumeChannel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // works, but seems like not thread safe
        } catch {
          case e: Exception => e.printStackTrace()
        }
      }
    }

Answer:

Although the documentation is pretty restrictive, some operations on channels are safe to invoke concurrently. You may ACK messages in the different thread as long as consuming and acking are the only actions you do on the channel.

See this SO question, which deals with the same thing:

RabbitMQ and channels Java thread safety

Question:

I'm curious if there were no unexpected error, will the channel stay alive while waiting for the client to acknowledge a message (using channel.basicAck())? Does a channel have a timeout parameter?

For example, will this code be problematic if xxx is very large?:

@RabbitListener(queues = DURABLE_QUEUE)
  public void listenAddAndDelete(@Payload Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("receive user msg: {}", message);
    // sleep very long time,then ack,is channel has a timeout?
    Thread.sleep(xxx);
    try {
      channel.basicAck(tag,false);
    } catch (IOException e) {
      //
    }
  }

In addition´╝îWhen will the channel be closed under normal circumstances´╝č


Answer:

It generally won't timeout as long as heartbeats are enabled (which is the default), but keeping a message in that state for a long time is an anti-pattern, as I suggested in a comment to the answer referred to in the comment above.

Question:

Scenario

I'm writing a Java application that imports products to a database from several different sources and then export the updated products to other systems like search engines and RSS feeds. Multiple imports, multiple exports. As its crucial that some of the exporters gets the updates ASAP I'm letting them stay running and listen for updates via queues in a RabbitMQ instance. Some importers will process files in bulk (meaning there will be many updates in close proximity), some importers with get updates sporadically (a few updates an hour, like from an admin). Each importer will have an instance of UpdateNotifier.

Example

This is the (somewhat simplified) class for adding IDs of the updated products to the RabbitMQ exchange:

public class UpdateNotifier
{
    private Connection conn;

    public UpdateNotifier(Connection alreadyOpenConnection)
    {
        conn = alreadyOpenConnection;
    }

    public void productIsUpdated(String id)
    {
        Channel chan = conn.createChannel();

        publishTheMessageToExchange(chan, id);

        chan.close();
    }
}
Question

Is it advisable to open a new channel for each publish and then closing it or would it be better to cache the Channel within each instance of the UpdateNotifier?

Maybe two different notifiers, one for bulk updates that keep a channel in the instance and one for sporadic updates that open and close the channel with each update?

So it boils down to: How costly is it to open and close Channels?


Answer:

Create and destroy a channels it is a very easy and fast operation for RabbitMQ.

But, if you need an high throughput create/destroy a channel for each publish can impact the performance.

For my point of view, you don't need to cache channels etc, just use one channel for thread and you are safe.

I'd suggest to read this https://www.rabbitmq.com/production-checklist.html and https://www.rabbitmq.com/networking.html .

The links can help you to tune RabbitMQ

Question:

I'd like a quick confirmation of what I suspect this part of the RabbitMQ documentation says:

Callbacks to Consumers are dispatched on a thread separate from the thread managed by the Connection. This means that Consumers can safely call blocking methods on the Connection or Channel, such as queueDeclare, txCommit, basicCancel or basicPublish.

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

I have various commands (messages) coming in through a single inbound queue and channel which has a DefaultConsumer attached to it. Is it correct to assume that there is a threadpool in DefaultConsumer that lets me run application logic straight off the consumer callback method, and I'm not blocking the processing of later commands? And that if it seems like there's a bottleneck, I can just give RMQ a bigger threadpool?

In addition, occasionally there is a basicPublish to the same channel from other threads. I take it that this does hold up the consumers? I guess I should make use of a new channel when doing this?


Answer:

The thread pool you mentioned is not a part of DefaultConsumer but rather a part of Connection that is shared between its Channels and DefaultConsumers. It allows different consumers be invoked in parallel. See this part of the guide.

So you would expect that by increasing size of the thread pool you can reach higher level of parallelism. However that's not the only factor that influences it.

There's a big caveat: incoming messages flowing though a single channel are processed serially no matter how many threads you have in the thread pool. It's just the way how ConsumerWorkService is implemented.

So to be able to consume incoming messages concurrently you have either to manage multiple channels or to put those messages into a separate thread pool.

Publishes do not use threads from the Connections's thread pool so they do not hold up consumers.

For more details you may check this post.

Question:

I've been working on a transactional services which is used as a part of core banking project. In many services, before inserting any records in my database, I should run several validation on the records. It is also true for editing the records which are already exist in the database. But sometimes in order to update a record in an specified table, we should change records in other tables which are related to the specified table. Consequently, we need a user confirmation to change records on other tables. But the problem is I don't know how could I get a user confirmation in client while I run a transaction in the server. Is it possible to tackle this problem using sending message between client and server through rabbitMq? I will be appreciate if any one explain any solution using clear sample.


Answer:

When I would need to implement this, I would do it in an other way (I do not know if this works for your scenario).

  • I would first let the user input his data,
  • then I would do a try run and check which additional confirmations are needed I would also save (in the user session) all relevant constraints that are determined while the try run
  • then I would ask the user for the additional confirmations determined while the try run
  • then I would do the real-run, and use the saved constraint checks to be save that nothing relevant has been changed meanwhile.
  • (If there is a relevant change in the data between try- and real-rung detected, I would apologise and start the process with step 2 again)

but this only works if you do not have so many "meanwhile changes"

Question:

I am running a spring-boot application in ec2 on a c3.large machine. It initializes a spring-rabbit client, which starts up its own thread.

After profiling my application using YourKit, I see that a lot of time is spent inside the rabbit client thread, inside: com.rabbitmq.client.impl.AMQConnection$MainLoop.run() specifically down in java.io.DataStream.readUnsignedByte()

To me this looks like there is a while loop that continuously blocks on getting some input on a socket from the RabbitMQ server.

Has anyone run into this? Am I reading the profiling results correctly? Is there a way to make the amqp client be non-blocking?


Answer:

That code (com.rabbitmq.client) is in the underlying amqp-client (RabbitMQ Java client) code used by Spring AMQP.

To me this looks like there is a while loop that continuously blocks on getting some input on a socket from the RabbitMQ server.

Yes, but when it blocks waiting for data, it does not use CPU - only when data is available does that method return. It's not spinning the cpu waiting for data.

Question:

I am currently experimenting with failure scenarios that might happen when communicating via the message broker RabbitMQ. The goal is to evaluate how such communication can be made more resilient.

In particular, I want to trigger a nack (not-acknowledge) confirm when sending messages in producer-commit mode. To do so, I send a message to a non-existent exchange via Spring AMQP's RabbitTemplate.send. In the callback provided via RabbitTemplate.setConfirmCallback, I then handle ack=false confirms by resending the message to an existing exchange (simulating that I took care of the nack cause).

A sample class and the related test are provided below, the complete sample project can be found in my github repository. I use RabbitMQ 3.6 and Spring Boot/AMQP 2.0.2.

When running the test, the callback is called with ack=false as expected. However, re-sending the message hangs while re-creating a channel (with a timeout exception after 10 minutes). A dump of the call stack and logs are provided below.

A solution to the problem seems to be to send the message in a different thread as proposed here. If you uncomment the line service.runInSeparateThread = true; in the test, things work!

However, I neither truely understand why things (don't) work nor did I read about this practice anywhere except for the above mentioned post. Is this expected behavior or a bug? Can someone explain the details?

Thanks a lot for your advice!

A call stack snapshot:

 "AMQP Connection 127.0.0.1:5672@3968" prio=5 tid=0xe nid=NA waiting
 java.lang.Thread.State: WAITING
  at java.lang.Object.wait(Object.java:-1)
  at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
  at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
  at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
  at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
  at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
  at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
  at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
  at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)
  at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542)
  at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:57)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1156)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1144)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:585)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:568)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:538)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:520)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94)
  at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771)
  at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:859)
  ...

The logs:

...
10:21:24.613 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - declaring Exchange 'ExistentExchange'
10:21:24.630 [main] INFO com.example.rabbitmq.ProducerService - sending `initial Message`
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate$MockitoMock$952329793@562c877a
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341] to map, size now 1
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$175/1694519286 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341]
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'[B@67001148(byte[15])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [nonExistentExchange], routingKey = [nonExistentQueue]
10:21:24.659 [main] INFO com.example.rabbitmq.ProducerService - done with sending message
10:21:24.675 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.677 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Sending confirm PendingConfirm [correlationData=null cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40)]
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - In confirm callback, ack=false, cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40), correlationData=null
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - sending `resend Message`
10:21:24.678 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - AMQChannel(amqp://guest@127.0.0.1:5672/,1) No listener for seq:1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PendingConfirms cleared 

ProducerService:

@Service
public class ProducerService {

    static final String EXISTENT_EXCHANGE = "ExistentExchange";
    private static final String NON_EXISTENT_EXCHANGE = "nonExistentExchange";
    private static final String QUEUE_NAME = "nonExistentQueue";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final RabbitTemplate rabbitTemplate;
    private final Executor executor = Executors.newCachedThreadPool();
    boolean runInSeparateThread = false;

    public ProducerService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this::confirmCallback);
    }

    private void confirmCallback(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("In confirm callback, ack={}, cause={}, correlationData={}", ack, cause, correlationData);
        if (!ack) {
            if (runInSeparateThread) {
                executor.execute(() -> sendMessage("resend Message", EXISTENT_EXCHANGE));
            } else {
                sendMessage("resend Message", EXISTENT_EXCHANGE);
            }
        } else {
            logger.info("sending was acknowledged");
        }
    }

    public void produceMessage() {
        sendMessage("initial Message", NON_EXISTENT_EXCHANGE);
    }

    private void sendMessage(String messageBody, String exchangeName) {
        logger.info("sending `{}`", messageBody);
        rabbitTemplate.send(exchangeName, QUEUE_NAME, new Message(messageBody.getBytes(), new MessageProperties()));
        logger.info("done with sending message");
    }

}

ProducerServiceTest:

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {RabbitAutoConfiguration.class, ProducerService.class})
@DirtiesContext
public class ProducerServiceTest {

    @Autowired
    private ProducerService service;
    @SpyBean
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Before
    public void setup() {
        cachingConnectionFactory.setPublisherConfirms(true);
        amqpAdmin.declareExchange(new DirectExchange(ProducerService.EXISTENT_EXCHANGE));
    }

    @After
    public void cleanup() {
        amqpAdmin.deleteExchange(ProducerService.EXISTENT_EXCHANGE);
    }

    @Test
    public void sendMessageToNonexistentExchange() throws InterruptedException {
        final CountDownLatch sentMessagesLatch = new CountDownLatch(2);
        final List<Message> sentMessages = new ArrayList<>();
        doAnswer(invocation -> {
            invocation.callRealMethod();
            sentMessages.add(invocation.getArgument(2));
            sentMessagesLatch.countDown();
            return null;
        }).when(rabbitTemplate).send(anyString(), anyString(), any(Message.class));

//        service.runInSeparateThread = true;
        service.produceMessage();
        sentMessagesLatch.await();

        List<String> messageBodies = sentMessages.stream().map(message -> new String(message.getBody())).collect(toList());
        assertThat(messageBodies, equalTo(Arrays.asList("initial Message", "resend Message")));
    }

}

Answer:

It could be considered a bug, I suppose, but it's an artifact of the way we cache channels to improve performance. The problem is that attempting to publish on a channel on the same thread that's delivering an ack for the same channel causes a deadlock in the client library.

We have an open issue to look into a solution (for a different reason); we just haven't gotten around to it. AFAIK, you are only the second user to hit this in more than 6 years since we added support for confirms and returns.

EDIT

Actually, this is a different situation; it's not reusing the channel since the channel is closed. It is trying to create a new channel and that is what is deadlocked. I don't see how we (Spring AMQP) can do anything; it's a limitation of the java client; you cannot perform operations on the ack thread.

Question:

I use Java's rabbitmq-client (https://mvnrepository.com/artifact/com.rabbitmq/amqp-client) and I need to implement the following scenario:

  • While receiving Rabbit messages, I may need to pause Rabbitmq consumption from particular queues if I suspect that all awaiting data will not fit in memory.
  • After I processed some messages, I need to open consumption again for the following set of messages.
  • Repeat as needed.

What would be the best way to implement pause/resume of listening from a RabbitMQ queue using the amqp-client Java library?


Answer:

You don't mention which method you're using to consume messages, so I assume you are using basicConsume to subscribe to messages from a queue.

As that document mentions, you can use basicCancel to stop consuming from a queue. You would then use basicConsume when you wish to start again.

Be sure to use basicQos to set a reasonable prefetch count.

Question:

I'm listening for connection changes through events pluging ("amq.rabbitmq.event", "connection.#").

It works properly so I'm adding at java side two additional parameters as clientproperties, to get the identity of the user that connects or disconnect.

However at c# side I can only access these properties as a list of byte[], and not sure on how to convert it to a Dictionary or so..

If I print all entries

if (args.BasicProperties.Headers.TryGetValue("client_properties", out object value))
            {
                var items = value as List<object>;
                foreach(var item in items)
                {
                    Console.WriteLine($"{item.GetType().ToString()}");
                    var bytes = item as byte[];
                    result.Add(Encoding.UTF8.GetString(bytes));
                }
            }

I can see this:

{<<"platform">>,longstr,<<"Java">>}
{<<"capabilities">>,table,[{<<"connection.blocked">>,bool,true},{<<"basic.nack">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"authentication_failure_close">>,bool,true},{<<"publisher_confirms">>,bool,true},{<<"consumer_cancel_notify">>,bool,true}]}
{<<"groupId">>,longstr,<<"1e6e935f0d4d9ec446d67dadc85cbafd10d1a095">>}
{<<"information">>,longstr,<<"Licensed under the MPL. See http://www.rabbitmq.com/">>}
{<<"version">>,longstr,<<"4.8.1">>}
{<<"copyright">>,longstr,<<"Copyright (c) 2007-2018 Pivotal Software, Inc.">>}
{<<"product">>,longstr,<<"RabbitMQ">>}

What kind of object format is and how can I parse this?:

{<<id>>,type,<<value>>}

Answer:

Apparently ( as for an answer I got on Rabbit client google group for this questions ), client_properties is something that's not created to being read by the receiving party..

However is a really good way to have something like LWT ( Last Will and Testament ), then I am using it at the minute doing the parse by myself.

if (args.BasicProperties.Headers.TryGetValue("client_properties", out object value))
{
    var items = value as List<object>;
    foreach (var item in items)
    {
        var bytes = item as byte[];
        //{<<id>>, type, <<value>>}
        String itemStr = Encoding.UTF8.GetString(bytes);
        var parts = itemStr.Split(",");
        var key = CleanErlangString(parts[0]);
        var value = CleanErlangString(parts[2]);

        // Do things with key/value

    }
}

ClearErlangFunction

private static string CleanErlangString(string toClean)
{
    return toClean
        .Replace("{", "").Replace("}", "")
        .Replace("\"", "")
        .Replace("<<", "").Replace(">>", "");
}

What I am doing to use it as LWT, is setting a custom property on client side and then obtaining it while reading events at "amq.rabbitmq.event", "connection.#". With that I know who have disconnected and even process something as LWT with my core server.

I hope this helps someone :)

Question:

Here is the error I get when I run my main. I do not really understand why it is having as issue with line 44: channel.basicConsume(Q,true,consumer); My goal here is to try and store the messages received into a variable that I can use in other files.

The error is: Exception in thread "main" java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1255)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:471)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:461)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:456)
at Recv.recv(Recv.java:44)
at mainLaptop.main(mainLaptop.java:11)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'Leonardo' in vhost '/', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1253)
... 5 more

Here is my code for the Recv file

    public class Recv 
    {

public static String recv(String ip, String Q) throws Exception 
{

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(ip);
    factory.setUsername("test");
    factory.setPassword("test");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    MyConsumer consumer=new MyConsumer(channel);
    channel.basicConsume(Q,true,consumer);

    return consumer.getStoredMessage();
}

public static class MyConsumer extends DefaultConsumer 
{
    private String storedMessage;

    public MyConsumer(Channel channel) 
    {
        super(channel);
    }

    public String getStoredMessage() 
    {
        return storedMessage;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException 
    {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
        storedMessage = message; // store message here
    }
}
}

Answer:

Many exceptions contain helpful pieces of information to tell you what is wrong, and what you can do to solve them.

In this case, your question is I do not really understand why it is having as issue with line 44: channel.basicConsume(Q,true,consumer);

Even though the stack trace is ugly, you need to read it, because the exception contains the following text:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'Leonardo' in vhost '/', class-id=60, method-id=20)

The error is very clear. No queue 'Leonardo' in vshost '/'. You have not declared the queue, which is passed in as string Q, in your RabbitMQ server. Thus, attempting to consume from a queue that doesn't exist is an exception. Declare the queue first, and your problem will go away.

Question:

I'm studying RabbitMQ and I was wondering how to return the data it processes to the client (website user).

The process I'm thinking consists of a REST endpoint where the user requests something to be processed which takes about 5 seconds to finish. So the user requests it, the webservice forward the request to the RabbitMQ queue which will have up to 10 consumers in docker containers listening to it and ready to process the request. So far so good, I can do this. The thing is, how can I return the data to the user after the consumer has finished processing it? Having the design below in mind, there's the image for better understanding:

So in other words, it'll be something like:

1 - The producer (rest) receives the request and sends the message to the RabbitMQ.

2 - The RabbitMQ forward the message to any consumer listening.

3 - The consumer process the data.

4 - This is where I'm lost. How can I return the generated data to the client (website user)? One more thing, the user will be waiting for the request to end, no need to send it later.

For the sake of better details, I'm using java and for the rest part is spring boot.

Image:


Answer:

See Request Reply Messaging.

Use one of the RabbitTemplate's sendAndReceive() methods.

On the consumer side simply return a result from your @RabbitListener method.

@RabbitListener(queues = "foo")
public String process(String in) {
    return in.toUpperCase();
}

EDIT

@SpringBootApplication
public class So56025184Application {

    public static void main(String[] args) {
        SpringApplication.run(So56025184Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            Scanner scanner = new Scanner(System.in);
            String toSend = scanner.nextLine();
            while (!"quit".equals(toSend)) {
                System.out.println(template.convertSendAndReceive("q1", toSend));
                toSend = scanner.nextLine();
            }
            scanner.close();
        };
    }

    @RabbitListener(queues = "q1")
    public String listen(String in) {
        return in.toUpperCase();
    }

    @Bean
    public Queue queue() { // RabbitAdmin will add this to the broker
        return new Queue("q1");
    }

}

You can send/receive rich objects (rather than simple strings), using Java serialization (and the default SimpleMessageConverter, or conversion to JSON, using the Jackson2JsonMessageConverter.

Question:

I want to know what happens when we receive ACK. Do we receive ACKs in a single thread or in many threads? Do handleAck and handleNack methods are used by a single thread or many threads? If they are used by a single thread then it is OK. But if they are used by several threads then we have to construct our code in the thread safe manner.


Answer:

You shouldn't need to write your ConfirmListener code thread-safe but not because the ack and nack methods won't be called from multiple threads but because you shouldn't share a Channel between threads to begin with.

The documentation specifically calls this out:

While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire. Sharing channels between threads will also interfere with * Publisher Confirms.

When you are publishing to the broker just don't share the Channel. Channels are lightweight and not that expensive to create. That way you don't need to worry about the confirms either.

If you do share the Channel your confirms will be interfered with as per the above quote.

Question:

Using com.rabbitmq.client with AMQP version 0.9.1 I am doing the following to declare a durable headers exchange, declare a durable queue, and bind the queue to the exchange with headers.

channel.exchangeDeclare("myExchange", "headers", true);
channel.queueDeclare("myQueue", true, false, false, null);

Map<String, Object> bindingArgs = new HashMap<String, Object>();
bindingArgs.put("x-match", "any"); //any or all
bindingArgs.put("headerName1", "headerValue1");

channel.queueBind("myQueue", "myExchange", "", bindingArgs);

If I run the same code again, but with a different header name/value I am effectively adding another header to the queue on the broker (not replacing the previous one).

i.e.

...
bindingArgs.put("headerName2", "headerValue2");
...

Is there a way with the java rabbitmq client to get all of the bound headers for a queue from the broker?

This would return something like:

"headerName1" : "headerValue1"
"headerName2" : "headerValue2"

Answer:

This question is a duplicate of

List bindings for an exchange with rabbitmq java client API.

While this functionality doesn't appear to be in the java client, it is possible to view bindings (including header arguments) via command line on the broker.

rabbitmqctl list_bindings

See the RabbitMQ documentation for more options https://www.rabbitmq.com/rabbitmqctl.8.html#list_bindings

Question:

I have a simple producer and receiver classes to work with rabbitmq

Producer:

public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }

And receiver :

public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }

The question is, is it possible for Producer to know is the Receiver took the message . For example:

 channel.sendAndReceiveMessage(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

Answer:

Short answser: You don't have a direct way to let the producer know that the consumer acked the message Long one: The second parameter you're sending here: channel.basicConsume(queueName, true, consumer); is the auto-ack, setting it to false will force you to do a channel.basicAck(deliveryTag, false); to acknowledge receipt of the message (you can get the delivery tag from the envelope) or a Channel.basicNack to tell that something failed What does it mean? it means that if you don't ack the message, it'll return to the queue and wait to be consumed again, this way, you can ensure that all messages will be consumed.

Question:

I'm attempting to use the Java STOMP client library Stampy to connect to RabbitMQ-Web-STOMP.

The Stampy examples have two reference implementations - Mina and Netty.

Are either of these correct to use with RabbitMQ or do I have to implement another RI for RabbitMQ?


Answer:

As @marko-topolnik commented, a reference implementation is required, but there are two choices. Both appear to function with my MQ server RabbitMQ. Even though they are different implementations, they behave identically in relation to the protocol.

I still don't understand why they can't be included with Stampy, tho.