Hot questions for Using RabbitMQ in python

Question:

I'm trying to get notifications from a RabbitMQ residing on a server. I've been told to use this code, which should print progress notifications. But when running the code and submitting a job to the queue, I'm not seeing anything. The code doesn't print anything:

import pika

rabbitMqHost = 'host'
rabbitMqUser = 'user'
rabbitMqPass = 'password'
exchangeName = 'ProgressNotification'


credentials = pika.PlainCredentials(rabbitMqUser, rabbitMqPass)

connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitMqHost, 5672, '/', credentials))
channel = connection.channel()

# channel.exchange_delete(exchange=exchangeName)
# exit(3)

channel.exchange_declare(exchange=exchangeName, exchange_type='fanout')

result = channel.queue_declare()
queue_name = result.method.queue

channel.queue_bind(exchange=exchangeName,
                   queue=queue_name)


def callback(ch, method, properties, body):
    print("> %r" % (body,))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Sorry, I'm very new to RabbitMQ. But is there another step or something missing?! Why it doesn't show anything?


Answer:

Your script works fine. I pushed a message to a queue called simple_queue using the exchange ProgressNotification and your script printed.

b'Hello World!'

I used this script, based on my own RabbitMQ library, but you can just use this pika example as a reference.

from amqpstorm import Connection
from amqpstorm import Message

with Connection('127.0.0.1', 'guest', 'guest') as connection:
    with connection.channel() as channel:
        # Declare the Queue, 'simple_queue'.
        channel.queue.declare('simple_queue')

        # Create the message.
        message = Message.create(channel, 'Hello World!')

        # Publish the message to a queue called, 'simple_queue'.
        message.publish('simple_queue', exchange='ProgressNotification')

In Java you would need to publish your message like this.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    private final static String QUEUE_NAME = "simple_queue";
    private final static String EXCHANGE_NAME = "ProgressNotification";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

Question:

I have rabbitmq setup on my machine and it has 3 different queues. One java code is listening to a queue and other queues are sending messages to python codes. Now python codes are working fine but java code seems to have a problem with AMQ. Following error is coming:

Exception in thread "main" com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:341)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:590)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:612)
    at com.elki.test.Worker.main(Worker.java:73)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:326)
    ... 3 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:532)
    at java.lang.Thread.run(Thread.java:744)

How come there could be an AuthenticationFailure with java but not with python. Any help appreciated.

CODE:

public static void main(String[] argv)
                          throws java.io.IOException,
                          java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        com.rabbitmq.client.Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());

          System.out.println(" [x] Received '" + message + "'");



          "do some work"
        System.out.println(" [x] Done" );

          int prefetchCount = 1;
          channel.basicQos(prefetchCount);
          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
      }

Answer:

I suspect it's because you haven't set the password nor the username on the ConnectionFactory object, and so it can't authenticate with RabbitMQ. (Perhaps your Python code is passing those in, and so therefore can authenticate.)

Try adding this code before calling factory.newConnection:

factory.setUsername(userName);
factory.setPassword(password);

replacing userName and password as needed for your code.

Question:

I'm sending messages from Java Spring Boot application to consumer which is Python application.

Everything works fine except when I enter command rabbitmqctl list_queues it shows that video_queue 0 which means there are no messages in the queue.

Consumer is receiving messages and doing some long process; so if I send multiple messages in a row there should be some messages that waiting on the queue. Am I right?

Producer:

@Component
public class VideoProducer {
    private Logger logger = LoggerFactory.getLogger(VideoProducer.class);
    private final static String BROKER_EXCHANGE_NAME = "video_exchange";
    private final static String ROUTING_KEY = "video_routing_key";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private VideoService videoService;

    @Autowired
    private Gson gson;

    public void produceVideo(VideoDTO video) {
            rabbitTemplate.convertAndSend(BROKER_EXCHANGE_NAME, ROUTING_KEY, gson.toJson(video));

        }
    }
}

Consumer

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channelConsumer = connection.channel()

# Video Consumer Settings
channelConsumer.exchange_declare(exchange='video_exchange',
                                 exchange_type='direct')
channelConsumer.queue_declare(queue="video_queue")
channelConsumer.queue_bind(queue="video_queue",
                           exchange="video_exchange",
                           routing_key="video_routing_key")


# Consumer Listener
def callback(ch, method, properties, body): 
   video_dto = eval(json.loads(body))  
   ##Something long process here
   print("Done..  ")    


channelConsumer.basic_consume(queue='video_queue',
                              auto_ack=True,
                              on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channelConsumer.start_consuming()

Where can I see the messages on the queue that I declared? Because although I know there are messages on the queue I can't see them with the above command.

I am also using RabbitMQ management portal.


Answer:

You can use RMQ management console to view the the messages and other information of RMQ. Consumers may consume more than one message and have them in the internal queue.To avoid this, set QOS to 1 and ack required to true. More on QOS : https://www.rabbitmq.com/consumer-prefetch.html

Question:

I am having trouble understanding what the advantage of using Celery is. I realize you can use Celery with Redis, RabbitMQ etc, but why wouldn't I just get the client for those message queue services directly rather than sitting Celery in front of it?


Answer:

The advantage of using Celery is that we mainly need to write the task processing code and handling of task delivery delivery to the task processors is taken care of by the Celery framework. Scaling out task processing is also easy by just running more Celery workers with higher concurrency (more of processing threads/processes). We don't even need to write code for submitting tasks to queues and consuming tasksfrom the queues. Also, it has built in facility to add/removing consumers for any of the task queues. The framework supports retry of tasks, failure handling, results accumulating etc. It has many many features which helps us to concentrate on implementing the task processing logic only.

Just for an analogy, implementing a map-reduce program to run on Hadoop is not a very complex task. If data is small, we can write a simple Python script to implement the map-reduce logic which will outperform a Hadoop map-reduce Job processing the same data. But when data is very huge, we have to divide the data across machines, we will need to run multiple processes across machines and co-ordinate their executions. The complexity lies in running multiple instances of mappers and then reducers tasks across multiple machines, collecting inputs and distributing the inputs to mappers, transferring the outputs of mappers to appropriate reducers, monitoring progress, relaunching failed tasks, detecting job completion etc. But because we have Hadoop, we don't need to care much about the underlying complexity of executing a distribute job. Same way Celery also helps us to concentrate mainly on task execution logic.