Hot questions for Using RabbitMQ in activemq

Question:

Hello I am writing some kind of simple testing scenario where I execute the following source code:

Here is my send() method:

public void send() throws JMSException {

    Session session = null;
    MessageProducer producer = null;

    try {
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue("TEST.FOO");

        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        byte[] uselessData = new byte[1024];

        BytesMessage message = session.createBytesMessage();
        message.writeBytes(uselessData);

        producer.send(message);

    } finally {
        producer.close();
        session.close();
    }
}

Here is my receive() method:

public void receive() throws JMSException {

    Session session = null;
    MessageConsumer consumer = null;

    try {
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("TEST.FOO");
        consumer = session.createConsumer(destination);

        Message hugeMessage = consumer.receiveNoWait();

        if (hugeMessage == null) {
            System.out.println("Message was not received");
            unsucsesfullCount++;
        } else {
            if (hugeMessage instanceof BytesMessage) {
                System.out.println("Message received");
            }
        }
    } finally {
        consumer.close();
        session.close();
    }

}

I execute:

send();
receive();

The message value after receiveNoWait() is always null.

My question here is does the receiveNoWait() guarantee message delivery when there are messages in the broker? The send() is executed successfully so there is at least one message in the destination.

I've searched in the specification but there is not really clear definition if a message, which is available at the broker side should be explicitly received by receiveNoWait() at client side.

Also I want to ask, if receiveNoWait() does not have message available, should it trigger some refresh consumer process in the broker, so the next receiveNoWait() will receive the message?

The example code I provided run on ActiveMQ but my question is more conceptual than provider specific, because I have the same observation for other JMS providers.


Answer:

No, the specification does not guarantee that any call to receiveNoWait will return a message, it might and then again it might not. When using receiveNoWait you must always check for a null return and act accordingly.

In the case of ActiveMQ the client will return a message if the broker has dispatched one to it and it is immediately available in the consumer prefetch buffer otherwise it just returns null.

Other implementations my indeed send a poll request to the broker, Qpid JMS for instance uses an AMQP link drain request to ask that the broker send it any messages that are available for dispatch and the broker will either send them or signal that the link is drained and there are no messages ready.

In short it's completely up to the client and broker how they implement receiveNoWait but no matter what you always need to account for the chance that you won't get a message returned to you from that method.

Question:

I have been trying to develop a project which use both activeMQ and rabbitMQ at the same time. The dependencies which I added in pom.xml listed below:

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-broker</artifactId>
        <version>5.13.0</version>
    </dependency>

Also, I run the apache-activemq-5.13.0 and rabbitmq-server-3.5.6 at the same time. But unfortunately, I faced an error which is related to the AMQP and demonstrated below:

java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:350)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:648)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:678)
    at org.hafiz.framework.common.rabbit.filter.ReceiveRabbitMessageFilter.init(ReceiveRabbitMessageFilter.java:33)
    at org.hafiz.common.filter.PrmTarrifTypeMessageFilter.init(PrmTarrifTypeMessageFilter.java:21)

    at org.apache.catalina.core.ApplicationFilterConfig.initFilter(ApplicationFilterConfig.java:279)
    at org.apache.catalina.core.ApplicationFilterConfig.getFilter(ApplicationFilterConfig.java:260)
    at org.apache.catalina.core.ApplicationFilterConfig.<init>(ApplicationFilterConfig.java:105)
    at org.apache.catalina.core.StandardContext.filterStart(StandardContext.java:4854)
    at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5546)
    at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
    at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901)
    at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877)
    at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652)

    at org.apache.catalina.startup.HostConfig.deployDirectory(HostConfig.java:1263)

    at org.apache.catalina.startup.HostConfig$DeployDirectory.run(HostConfig.java:1948)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)

    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:293)
    ... 20 more
Caused by: com.rabbitmq.client.MalformedFrameException: AMQP protocol version mismatch; we are version 0-9-1, server sent signature 0,1,0,0
    at com.rabbitmq.client.impl.Frame.protocolVersionMismatch(Frame.java:174)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:111)

    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
    ... 1 more
java.lang.NullPointerException
    at org.hafiz.framework.common.rabbit.filter.ReceiveRabbitMessageFilter.receiveMessage(ReceiveRabbitMessageFilter.java:61)
    at org.hafiz.common.filter.PrmTarrifTypeMessageFilter$1.run(PrmTarrifTypeMessageFilter.java:29)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

I will be appreciate if any one help me tackle this error.


Answer:

The solution is just change one of those message brokers port number. As both of them work on a same port (5672), you should just simply change the configuration of one of them.

For rabbitMQ, you can fallow the instructions which are explained here, just like @Kenney said it is as simple as setting an environment variable.

Note: Do not forget reinstalling RabbitMQ service during setting the variable.

In windows, you can do this by running commands below in the command prompt:

  1. cd into the sbin folder under RabbitMQ server installation directory and run rabbitmq-service.bat remove
  2. set RABBITMQ_NODE_PORT=xxxx
  3. rabbitmq-service.bat install

Moreover, if it doesn't work you can simply change the default port of ActiveMQ for amqp. To do this you should look for the file name activemq.xml in the conf folder located in the ActiveMQ server installation directory. I hope this works for you.

Question:

I've got a Java RabbitMQ producer application, and I want to send some messages on ActiveMQ from Amazon MQ (I've created the broker). I follow the step from this page.

I made the Java example for RabbitMQ, at first it seems fine but when I create the connection to my Endpoint (username and password too): amqps://x-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-x.xx.xx-xxxx-x.amazonaws.com:5671

in this line:

factory.newConnection();

I've got these error:

Error in the internal library:

[AMQP Connection xx.xx.xx.xx:5671] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured
com.rabbitmq.client.MalformedFrameException: AMQP protocol version mismatch; we are version 0-9-1, server sent signature 3,1,0,0
    at com.rabbitmq.client.impl.Frame.protocolVersionMismatch(Frame.java:170)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:107)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:184)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:645)
    at java.lang.Thread.run(Thread.java:745)

Error I've caught up:

null com.rabbitmq.client.ShutdownSignalException: connection error

I understand the principal error was the mismatch version, but is there a way to fix it by changing the version protocol or it is not possible?

Because RabbitMQ uses AMQP 0-9 in the example.


Answer:

ActiveMQ (both 5.x "classic" and Artemis) supports AMQP 1.0. If the RabbitMQ client speaks AMQP 1.0 then it should work. If it doesn't, then it won't.

The documentation you cited doesn't indicate that the RabbitMQ client will work with ActiveMQ. It just demonstrates common uses of the RabbitMQ client and then provides alternate JMS examples. In the "Conclusion" section the article states:

In this post, I reviewed how to get started with an Amazon MQ broker and walked you through several code examples that explored the differences between RabbitMQ and Apache ActiveMQ client integrations. If you are considering migrating to Amazon MQ, these examples should help you understand the changes that might be required.

In other words, you'll have to change your application. You can't use the RabbitMQ client with ActiveMQ.

Question:

I am designing a component (component A) of a notification application that is integrating with a REST API (component B) which can accept a max of 10,000 concurrent requests. The data being sent is less than 1KB and will be stored in a NoSQL DB which will be read by component A and then sent to component B as concurrent as possible. I would like to concurrently send 10,000 requests from component A to component B. I'm planning on putting either rabbitmq or activemq between component A and B in order to control the thread counts and scale the concurrent processing in order to send a large number of concurrent requests (10,000).

Component A:

    -read all documents (less than 1kb each) from NoSQL database 
    -create JMS message and push data to rabbitmq or activemq
    -the 10,000 rabbitmq or activemq listener/consumer threads will concurrently consume, 
     transform NoSQL object graph to the request needed by component B and call component B 
     and repeat until all messages are sent to component B

Component B:

-can't handle more than 10,000 concurrent requests
-wont support a bulk API

The data set can get into the millions and a million messages will need to be processed and sent from component A to component B as fast as possible. I'm looking into batching the messages together.

Any design ideas regarding scaling this would be greatly appreciated


Answer:

it sounds like you're on the right path already

RabbitMQ can handle 100,000+ messages per minute, so it shouldn't have any issue hitting your 10K concurrent requests with Component B

As a message consumer, you can use a consumer prefetch setting to tell RabbitMQ how many messages it can handle at one time.

Set the prefetch to 10,000 (or less) for the code that reads from RabbitMQ, transforms the object graph and and posts to your REST api. With that in place, you should be good to go.