Hot questions for Using RabbitMQ in jms

Question:

I am continuously receiving data feed from third party and want to process around 100k messages in less than a minute. So thinking to implement a messaging queue through which I can offload the processing part and will push the message to a queue, where one worker out of 100 (or whatever number) picks the job and process it.

I've read about JMS and Redis based messaging, but I am not sure how to run multiple listeners. The single listener is already configured.


Answer:

Spring JMS allows you to specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple upper limit String, e.g. "10" (the lower limit will be 1 in this case).

The listener container will always hold on to the minimum number of consumers (setConcurrentConsumers(int)) and will slowly scale up to the maximum number of consumers

See:

[1] https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/config/DefaultJmsListenerContainerFactory.html#setConcurrency-java.lang.String-

[2] https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrency-java.lang.String-

An example in a Spring boot configuration bean:

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory 
          = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("3-10");
        return factory;
    }
}

Question:

I am new to RabbitMQ and trying to send a message to RabbitMQ using JMS. I have the below standard JMS producer program:

public void sendMessage() {
    Context context = null;
    ConnectionFactory factory = null;
    Destination destination = null;
    Connection connection = null;
    Session session = null;
    MessageProducer producer = null;

    Properties initialProperties = new Properties();
    initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY, "org.wildfly.naming.client.WildFlyInitialContextFactory");
    initialProperties.put(InitialContext.PROVIDER_URL, "http-remoting://IP:PORT");
    initialProperties.put(Context.SECURITY_PRINCIPAL, "mquser");
    initialProperties.put(Context.SECURITY_CREDENTIALS, "Mquser@123");
    try {
        context = new InitialContext(initialProperties);
        factory = (QueueConnectionFactory) context.lookup("jms/RemoteConnectionFactory");

        System.out.println("Lookup Success");
        destination = (Destination) context.lookup("jms/queue/TestQueue");
        System.out.println("Queue lookup success");
        connection = factory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(destination);

        String text = "8=FIX.4.49=7735=349=A56=B34=352=20200115-13:18:26.000 45=322222=D100208103222223=40558=Invalid FIX2ITFMsgType10=226";
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText(text);
        connection.start();
        System.out.println("Going to send");
        producer.send(textMessage);
        System.out.println(this.getClass().getName() + "has sent a message : " + text);
    } catch (NamingException e) {
        e.printStackTrace();
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        if (context != null) {
            try {
                context.close();
            } catch (NamingException ex) {
                ex.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException ex) {
                ex.printStackTrace();
            }
        }
    }
}

Which I am using to send message to ActiveMQ Artemis embedded in JBoss EAP 7.2. As I have found in this link I will not be able to use the above program to send message to RabbitMQ, but will of the below parameters:

  1. Username
  2. Password
  3. VirtualHost
  4. Host
  5. Port
  6. DestinationName
  7. ExchangeName
  8. QueueName
  9. RoutingKey

Can anyone give me an example of a standard program for sending message to RabbitMQ. I am using Spring Boot and can also use its features.


Answer:

Spring Boot provides a RabbitTemplate that makes it easy to send messages:

@Component
public class Example {

  private final RabbitTemplate rabbitTemplate;

  public Example(RabbitTemplate rabbitTemplate) {
      this.rabbitTemplate = rabbitTemplate;
  }

  @Override
  public void run(String... args) throws Exception {
       rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
  }

Please checkout the tutorial https://spring.io/guides/gs/messaging-rabbitmq/

Please also read the Spring Boot documentation: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-amqp

Question:

I have a list of objects that I put in Spring AMQP. Objects come from the controller. There is a service that processes these objects. And this service may crash with an OutOfMemoryException. Therefore, I run several instances of the application.

There is a problem: when the service crashes, I lose the received messages. I read about NACK. And could use it in case of Exception or RuntimeException. But my service crashes in Error. Therefore, I cannot send NACK. Is it possible to set a timeout in AMQP, after which I would be sent a message again if I had not confirmed the messages that had arrived earlier?

Here is the code I wrote:

public class Exchanges {
    public static final String EXC_RENDER_NAME = "render.exchange.topic";
    public static final TopicExchange EXC_RENDER = new TopicExchange(EXC_RENDER_NAME, true, false);
}

public class Queues {
    public static final String RENDER_NAME = "render.queue.topic";
    public static final Queue RENDER = new Queue(RENDER_NAME);
}

@RequiredArgsConstructor
@Service
public class RenderRabbitEventListener extends RabbitEventListener {
    private final ApplicationEventPublisher eventPublisher;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(Queues.RENDER_NAME),
                                             exchange = @Exchange(value = Exchanges.EXC_RENDER_NAME, type = "topic"),
                                             key = "render.#")
    )
    public void onMessage(Message message, Channel channel) {
        String routingKey = parseRoutingKey(message);
        log.debug(String.format("Event %s", routingKey));
        RenderQueueObject queueObject = parseRender(message, RenderQueueObject.class);
        handleMessage(queueObject);
    }
    public void handleMessage(RenderQueueObject render) {
        GenericSpringEvent<RenderQueueObject> springEvent = new GenericSpringEvent<>(render);
        springEvent.setRender(true);
        eventPublisher.publishEvent(springEvent);
    }
}

And this is the method that sends messages:

    @Async ("threadPoolTaskExecutor")
    @EventListener (condition = "# event.queue")
    public void start (GenericSpringEvent <RenderQueueObject> event) {
        RenderQueueObject renderQueueObject = event.getWhat ();
        send (RENDER_NAME, renderQueueObject);
}
private void send(String routingKey, Object queue) {
    try {
        rabbitTemplate.convertAndSend(routingKey, objectMapper.writeValueAsString(queue));
    } catch (JsonProcessingException e) {
        log.warn("Can't send event!", e);
    }
}

Answer:

You need to close the connection to get the message re-queued.

It's best to terminate the application after an OOME (which, of course, will close the connection).

Question:

I am new to Rabbit MQ. Read couple of Rabbit MQ tutorial

In the mentioned link my question is about below ReceiveLogs

     channel.basicConsume(queueName, true, consumer)

As single queue can hold messages routed with different routingKey. Say if consumer want to consume message with specific routing key. Is it possible ? Do i need to publish meesage with single routing key only on specific queue in this case ?

I do not see any method under Channel API


Answer:

In RabbitMQ producer publishes to an exchange not to a queue - this decouples producer from the consumer. Yes, as described in the official article you can have messages routed with different routing key in the same queue, you just need to create more than one binding for that queue for this to happen.

Now to directly answer your question: if you want to have one routing key per queue you need to do the following:

for each routing key you want consumer to listen to:

  • Create a queue
  • Call channel.queueBind(queueName, EXCHANGE_NAME, <your_routing_key>); only once for that queue.

Question:

I have few applications which are consuming JMS messages and AMQP (Rabbit MQ) messages. I am moving my applications to Pivotal Cloud Foundry. However I want retain the messaging services outside cloud in the servers where they are currently hosted. Can Java apps deployed in PCF consume messages from topics / queues that are hosted outside the cloud env ?


Answer:

yes, surely they can. You need to take care of few things like firewall opening from cloud environment to the new external to cf server; making a connection successfully to the external server from your cf app.