Hot questions for Using RabbitMQ in spring jms

Question:

I am reading official get started article about how to start spring-jms application

https://spring.io/guides/gs/messaging-jms/

@EnableJms triggers the discovery of methods annotated with @JmsListener, creating the message listener container under the covers.

But my application sees @JmsListener methods without @EnableJms annotation.

Maybe something else force spring search the @EnableJms methods. I want to know it.

project srtucture:

Listener:
@Component
public class Listener {

    @JmsListener(destination = "my_queue_new")
    public void receive(Email email){
        System.out.println(email);
    }
    @JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
    public void receiveTopic(Email email){
        System.out.println(email);
    }
}
RabbitJmsApplication:
@SpringBootApplication
//@EnableJms  I've commented it especially, behaviour was not changed.
public class RabbitJmsApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitJmsApplication.class, args);
    }

    @Bean
    public RMQConnectionFactory connectionFactory() {
        return new RMQConnectionFactory();
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setPubSubDomain(true);
        return factory;
    }
}

Answer:

Thanks for the feedback. This is a little bit confusing indeed and I've created an issue to improve the sample.

@EnableJms is a framework signal to start processing listeners and it has to be explicit because the framework has no way to know that you want to use JMS.

Spring Boot, on the other hand, can take default decisions for you based on the context. If you have the necessary bits to create a ConnectionFactory it will do so. Down the road, if we detect that a ConnectionFactory is available, we'll automatically enable the processing of JMS listeners.

Question:

I am reading the following article SPRING_BOOT_JMS_GETTING_STARTED

This example explains how to get started with emdedded ActiveMq message broker But I already have RabbitMq installed on my PC and I want to use this one.

First of all I enabled the jms rabbitMq plugin

But I don't see additional exchanges in the management console:

I expected to see it because of this answer

Honestly, I don't understand what should I do right now.

I have a code from Getting Started and I switched on the jms RabbitMq plugin.

Please advice me subsequent steps.

Small remark:

Garry answer works if I use following gradle dependencies:

dependencies {
    compile("org.springframework.boot:spring-boot-starter")
    compile group: 'org.springframework', name: 'spring-jms', version: '4.3.10.RELEASE'
    compile group: 'com.rabbitmq.jms', name: 'rabbitmq-jms', version: '1.7.0'
}

Answer:

The exchange won't show up until you actually use it. I just wrote a quick boot app and it worked fine for me...

@SpringBootApplication
public class RabbitJmsApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitJmsApplication.class, args);
    }

    @Autowired
    private JmsTemplate template;

    @Override
    public void run(String... arg0) throws Exception {
        template.convertAndSend("foo", "bar");
        template.setReceiveTimeout(10_000);
        System.out.println(template.receiveAndConvert("foo"));
    }

    @Bean
    public RMQConnectionFactory connectionFactory() {
        return new RMQConnectionFactory();
    }

}

Result:

bar

Question:

I have 2 Spring RabbitMq configurations, one using the RabbitTemplate, one using the JmsTemplate.


The configuration with the RabbitTemplate:

Class AmqpMailIntegrationPerfTestConfig:

@Configuration
@ComponentScan(basePackages = {
    "com.test.perf.amqp.receiver",
    "com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {

    @Bean
    public DefaultClassMapper classMapper() {
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("mail", MailMessage.class);
        classMapper.setIdClassMapping(idClassMapping);
        return classMapper;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        jsonConverter.setClassMapper(classMapper());
        return jsonConverter;
    }

    @Bean
    public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public ConnectionFactory createConnectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    Queue queue() {
        return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(createConnectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(createConnectionFactory());
        factory.setMaxConcurrentConsumers(5);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }

}

The AmqpMailSenderPerfImpl class in com.test.perf.amqp.sender package:

@Component
public class AmqpMailSenderPerfImpl implements MailSender {

    public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
    public static final String ROUTING_KEY = "mails";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public boolean sendMail(MailMessage message) {
        rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
        return true;
    }
}

The AmqpMailReceiverPerfImpl class in com.test.perf.amqp.receiver package:

@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Map<String,Date> datesReceived = new HashMap<String, Date>();

    @RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
    public void receiveMessage(MailMessage message) {
        logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
        datesReceived.put(message.getSubject(), new Date());
    }

    public Map<String, Date> getDatesReceived() {
        return datesReceived;
    }

}

The configuration with the JmsTemplate:

Class JmsMailIntegrationPerfTestConfig:

@Configuration
@EnableJms
@ComponentScan(basePackages = {
        "com.test.perf.jms.receiver",
        "com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

        Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
        typeIdMappings.put("mail", MailMessage.class);
        converter.setTypeIdMappings(typeIdMappings);

        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");

        return converter;
    }

    @Bean
    public ConnectionFactory createConnectionFactory(){
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);

        return connectionFactory;
    }

    @Bean(name = "myJmsFactory")
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("10-50");
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public Destination jmsDestination() {
        RMQDestination jmsDestination = new RMQDestination();
        jmsDestination.setDestinationName("myQueue");
        jmsDestination.setAmqp(false);
        jmsDestination.setAmqpQueueName("mails");
        return jmsDestination;
    }

    @Bean
    public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
        final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        return jmsTemplate;
    }

}

The JmsMailSenderImpl class in package com.test.jms.sender:

@Component
public class JmsMailSenderImpl implements MailSender {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public boolean sendMail(MailMessage message) {
        logger.info("Sending message!");
        jmsTemplate.convertAndSend("mailbox", message);

        return false;
    }

}

The JmsMailReceiverPerfImpl class in package com.test.perf.jms.receiver:

@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Map<String,Date> datesReceived = new HashMap<String, Date>();

    @JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
    public void receiveMail(MailMessage message) {
        datesReceived.put(message.getSubject(), new Date());
        logger.info("Received <" + message.getSubject() + ">");
    }

    public Map<String, Date> getDatesReceived() {
        return datesReceived;
    }

}

I test the above configurations by starting 10 threads and making the respective MailSenders send 1000 mails each.

For the config with the RabbitTemplate I get: * Total throughput time of all messages: 3687ms * Time to process one message: 817ms

For the config with the JmsTemplate I get: * Total throughput time of all messages: 41653ms * Time to process one message: 67ms

This seems to indicate that the version with the JmsTemplate is not working in parallel, or at least, does not use resources optimally.

Does anybody know what could be causing this? I played around with different transaction and concurrency parameters but to no avail.

What we want is to get the same throughput time with the JmsTemplate as with the RabbitTemplate, so we can use JMS as an abstraction layer.


Answer:

I can see why the consumer side is slower - the Consumer.receive() uses a synchronous basicGet() for each message whereas the @RabbitListener container uses basicConsume with a prefetch count of 250.

On the JMS sending side, you need to use a CachingConnectionFactory there are well, otherwise a new session/producer/channel is created for each send.

It's still quite a bit slower, though, even with that; I suggest you ask on the rabbitmq-users Google group where the RabbitMQ engineers hang out. They maintain the JMS client.

Question:

My Application currently uses IBM MQ and has queue config setup and working fine with JMS. e.g.

@EnableJms
@Configuration
public class IBMQueueConfig {

    @Bean("defaultContainer")
    public JmsListenerContainerFactory containerFactory(final ConnectionFactory connectionFactory,
                                                        final ErrorHandler errorHandler) {
        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(errorHandler);
        return factory;
    }
}

I can receive message and process as follows:

@Service
public class ProcessMessageReceive {

    @JmsListener(destination = "${queue}", concurrency = "${threads}", containerFactory = "defaultContainer")
    public Message processMessage(@Payload final String message) {
        //do stuff
    }
}

I need to use RabbitMQ for testing and require additional configuration. I have the following the class:

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
@EnableRabbit
public class RabbitMQConfiguration {

    private String host;
    private int port;
    private String username;
    private String password;
    private String virtualHost;


    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchange);
    }


    @Bean("defaultContainer")
    public JmsListenerContainerFactory containerFactory(@Qualifier("rabbit-connection-factory") final ConnectionFactory connectionFactory) {
        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(); //ERROR
        return factory;
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("rabbit-connection-factory") final ConnectionFactory connectionFactory,
                                                                               @Value("spring.rabbitmq.listener.simple.concurrency") final int concurrency,
                                                                               @Value("spring.rabbitmq.listener.simple.max-concurrency") final int maxConcurrency) {
        final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        containerFactory.setConcurrentConsumers(concurrency);
        containerFactory.setMaxConcurrentConsumers(maxConcurrency);
        containerFactory.setDefaultRequeueRejected(false);

        return containerFactory;
    }

    @Bean(name = "rabbit-connection-factory")
    public ConnectionFactory connectionFactory() {
        final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    public Queue inboundQueue() {
        return new Queue(fixInboundQueue, true);
    }

    @Bean
    public Binding inboundQueueBinding() {
        return bind(inboundQueue())
                .to(exchange())
                .with(routingKey);
    }
}

I get an error on line factory.setConnectionFactory(connectionFactory); as it expects a javax.jms.ConnectionFactory but provided is Rabbit MQ One.

Is there a way I can wire in the Rabbit MQ ConnectionFactory ? I know it is possible if I use RMQConnectionFactory, but I am looking to see If I can achieve it with Spring Rabbit dependency.

The objective is to avoid writing another processMessage() specifically for the Rabbit MQ and re-use what I already have.

Alternatively, can I use both annotations? In which case I would use spring profile to enable the one I need depending on prod or test?

  @RabbitListener(queues = "${app.rabbitmq.queue}") 
  @JmsListener(destination = "${queue}", concurrency = "${threads}", containerFactory = "defaultContainer")
    public Message processMessage(@Payload final String message) {
        //do stuff
    }

Answer:

You have to use @RabbitListener instead of @JmsListener if you want to talk to RabbitMQ over AMQP.

You can add both annotations if you want to use JMS in production and RabbitMQ in tests.

Question:

I am trying to send a message to RabbitMQ installed on my localhost using Spring 4 but for some reason the message is not getting sent and I do not get any error either. It looks to me that my Spring config (beans.xml) is not correct.

Please guide.

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>4.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-expression</artifactId>
        <version>4.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-beans</artifactId>
        <version>4.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>4.0.2.RELEASE</version>
    </dependency>
    <!-- RabbitMQ -->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.5.5.RELEASE</version>
    </dependency>
</dependencies>

beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:c="http://www.springframework.org/schema/c"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:flow="http://www.springframework.org/schema/webflow-config"
       xmlns:jee="http://www.springframework.org/schema/jee"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:lang="http://www.springframework.org/schema/lang"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
          http://www.springframework.org/schema/webflow-config http://www.springframework.org/schema/webflow-config/spring-webflow-config.xsd
          http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
          http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
          http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang.xsd
          http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
          http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd">

    <bean id="helloWorld" class="com.study.jms.HelloWorld">
        <property name="message" value="Hello World (Spring-RabbitMQ)!" />
    </bean>

    <rabbit:connection-factory id="rabbitConnectionFactory" host="localhost" username="guest" password="guest" port="5672"/>
    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    <rabbit:queue name="helloQueue"/>

</beans>

Main.java

public class Main {

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("beans.xml");
        HelloWorld helloWorld = (HelloWorld) context.getBean("helloWorld");

        //send message
        System.out.println("Sending message....");
        AmqpTemplate template = context.getBean(AmqpTemplate.class);
        template.convertAndSend(helloWorld.getMessage());
    }

}

HelloWorld.java

public class HelloWorld {

    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

}

Answer:

You need to tell RabbitMQ how to route the message.

Since you are not creating an exchange or binding the queue to an exchange, you can route to the default exchange ("") using the queue name.

You either need to set the routingKey property (routing-key attribute) on the template to the queue name, or you need to include it in your convertAndSend call:

template.convertAndSend("", "helloQueue", helloWorld.getMessage());

RabbitMQ simply drops messages that are successfully delivered to an exchange but not routed to any queue.

See the RabbitMQ Tutorials to understand exchanges, routing keys etc.

The Spring AMQP samples repo has Spring Boot versions of the tutorials.

Question:

I have two different apps for sender and receiver.

sender:

@SpringBootApplication
public class RabbitJmsApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitJmsApplication.class, args);
    }

    @Autowired
    private JmsTemplate template;
    @Autowired
    private JmsTemplate topicTemplate;

    @Override
    public void run(String... arg0) throws Exception {
        for (int i = 0; i < 10; i++) {
            template.convertAndSend("my_queue", "msg_" + i);
            Thread.sleep(100);
        }
        for (int i = 0; i < 10; i++) {
            topicTemplate.convertAndSend("my_topic", "topic_msg_" + i);
            Thread.sleep(100);
        }
    }

    @Bean
    public RMQConnectionFactory connectionFactory() {
        return new RMQConnectionFactory();
    }

    @Bean
    public JmsTemplate template() {
        return new JmsTemplate(connectionFactory());
    }

    @Bean
    public JmsTemplate topicTemplate() {
        final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }
}

and receiver:

@Component
public class Listener {

    @JmsListener(destination = "my_queue")
    public void receive(String str){
        System.out.println(str);
    }
    @JmsListener(destination = "my_topic")
    public void receiveTopic(String str){
        System.out.println(str);
    }
}

I see

msg_1
msg_2
...

on the receiver but I don't see the topic messages.

What am I doing wrong?

P.S.

management console:


Answer:

Subscriptions to topics are not durable by default - you are probably sending the messages before the listener has started.

Try adding a Thread.sleep() before sending the messages to the topic.