Hot questions for Using RabbitMQ in configuration

Top Java Programmings / RabbitMQ / configuration

Question:

How to acknowledge the messages manually without using auto acknowledgement. Is there a way to use this along with the @RabbitListener and @EnableRabbit style of configuration. Most of the documentation tells us to use SimpleMessageListenerContainer along with ChannelAwareMessageListener. However using that we lose the flexibility that is provided with the annotations. I have configured my service as below :

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

My RabbitConfiguration is as below
@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

Any help will be appreciated on how to adapt manual channel acknowledgement along with the above style of configuration. If we implement the ChannelAwareMessageListener then the onMessage signature will change. Can we implement ChannelAwareMessageListener on a service ?


Answer:

Add the Channel to the @RabbitListener method...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

and use the tag in the basicAck, basicReject.

EDIT

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual

Question:

There is an issue I've noticed when configuring InOut capable routes in Camel with the camel-rabbitmq extension. When I set the main queue configuration to autoAck=false the same configuration is then replicated also for the temporary reply queue (it even uses the same prefetch(5) settings, easy to see in the RabbitMQ console). This causes the messages in the temp queue to sit there indefinitely until a server restart.

Virtual host Name                         Features  State Ready Unacked Total incoming deliver / get ack
/test      amq.gen-Hkdx9mckIfMc6JhDI6d-JA AD Excl   idle    2   5   7   0.00/s 0.00/s   0.00/s
/test      amq.gen-eUU7BRI3Ooo4F8Me7HrPnA AD Excl   idle    2   5   7   0.00/s 0.00/s   0.00/s

Even though in the logs I can clearly see that the reply message is received just that the ack doesn't appear to be sent to RabbitMQ to clear our the message from the temp queue. And I've checked in the console that both temp queues have consumers on them so I would have expected Camel to send the ack.

o.a.c.c.r.RabbitMQMessagePublisher  - Sending message to exchange: emailfeedbackExchange with CorrelationId = Camel-ID-VMS-1534332570964-0-11
o.a.c.c.r.r.ReplyManagerSupport  - Received reply message with correlationID [Camel-ID-VMS-1534332570964-0-11]

The question is, how can I prevent this scenario while still keeping my autoAck=false and InOut capable route? Probably I should mention here that there are no errors or the like, the flow works as expected and email processing works perfectly, the only issue is the stale messages on the temp queue.

Our Camel version is 2.20.2 This is the relevant Gradle config for all the Camel components we have:

compile ("org.apache.camel:camel-spring-boot-starter:${camelVersion}")
compile ("org.apache.camel:camel-rabbitmq:${camelVersion}")
compile ("org.apache.camel:camel-amqp:${camelVersion}")

The queue and route configurations:

restentrypointroute:
    restEndpoint: /app
    postEndpoint: /email
    outputEmailEndpoint: rabbitmq://vms:5672/emailExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&channelPoolMaxSize=3&prefetchCount=5&prefetchEnabled=true

emailroutebuilder:
    serviceName: emailroutebuilder
    inputEndpoint: rabbitmq://vms:5672/emailExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&channelPoolMaxSize=3&prefetchCount=5&prefetchEnabled=true
    emailProcessor: bean:emailProcessor
    maximumRedeliveries: 5
    redeliveryDelay: 30000

Here is the relevant bit from the RestRouteBuilder implementation:

@Override
public void configure() throws Exception {

restConfiguration().component("restlet").bindingMode(RestBindingMode.json);

    rest(restEndpoint).post(postEndpoint)
      .type(MyRequest.class)
      .route()
      .startupOrder(Integer.MAX_VALUE - 2)
      .process(this::process)
      .choice()
      .when(header(DELIVERYSTATUS_HEADER)
          .isEqualTo(Status.GENERATED)).to(outputEmailEndpoint)
      .when(header(DELIVERYSTATUS_HEADER)
          .isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint, outputArchiveEndpoint).end()
      .endRest();

The process() method adds the DELIVERYSTATUS_HEADER header to the Camel exchange and validates the payload.

The EmailRouteBuilder looks like this:

public void configure() throws Exception {
    super.configure();

    from("direct:" + getServiceName())
            .to(emailProcessor)
            .process(ex -> {
                ex.setOut(ex.getIn());

            });

}

Where the super.configure() call configures exception handling and dead-lettering, startup order, retry counts, max re-deliveries etc. It's quite a bit of code there but if you think something in there might be the cause of this issue I'll post it up. Also, if you need me to add any other configuration please let me know.

From the above it's kind of clear why we need an InOut route with autoAck=false as losing emails is bad from a business standpoint and the REST client would need a response based on how the EmailProcessor got on. Just how to get rid of the stale messages in the temp queues?

EDIT Actually the route only works fine until the prefetch count is exhausted, after that it starts throwing exceptions and the REST client is getting HTTP 500 responses back.

org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis due reply message with correlationID: Camel-ID-VMSYS119-1534407032085-0-284 not received on destination: amq.gen-eUU7BRI3Ooo4F8Me7HrPnA.

Answer:

As per the comments, this turned out to be a bug in the camel-rabbitmq component and a fix has been now applied to the master branch.

Jira ticket here: https://issues.apache.org/jira/browse/CAMEL-12746

The fix will be available in versions 2.21.3, 2.22.1, 2.23.0 and above.

Edit:

Including the code change in the answer.

TemporaryQueueReplyManager line 139 - always start the consumer of temprary queues with auto acknowledge mode of true.

Changing this:

private void start() throws IOException {         
    tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this);     
}

To this:

private void start() throws IOException {
     tag = channel.basicConsume(getReplyTo(), true, this);
 }

Question:

By following this link

http://www.rabbitmq.com/access-control.html

I changed /etc/rabbitmq/rabbitmq.config and added the line

[{loopback_users, []}]

Now the config file looks like

[{kernel, [{inet_dist_use_interface, {127,0,0,1}}]},
{rabbit, [{loopback_users, []}, {tcp_listeners, [{"127.0.0.1", 5672}]}]},
{rabbitmq_mochiweb, [{listeners, [{mgmt, [{ip, "127.0.0.1"},
                                       {port, 55672}]}]}]}].

Then I restarted my rabbitmq server by executing in following order:

service rabbitmq-server stop
service rabbitmq-server start

Still I am unable to connect to this server remotely (using both Python and Java clients). It gives connection refused error.


Answer:

In my configuration I only have:

[{rabbit, [{loopback_users, []}]}]

and everything works fine,

try to remove tcp_listener option.

Question:

I'm trying to integrate a Log4J2 Appender to RabbitMQ in an already working and logging java application.

The application is build as a gradle project. Before the integration of spring-rabbit, the build.gradle file looked like this:

group 'Name'
version '1.18.7'

apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    [...]
    compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.10.0'
    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.10.0'
    compile group: 'com.lmax', name: 'disruptor', version: '3.3.7'
    [...]
}

jar {
    manifest {
        attributes 'Main-Class': 'the.main.Clazz'
    }
}


buildscript {
    repositories {
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        classpath group: 'com.github.jengelman.gradle.plugins', name: 'shadow', version: '2.0.2'
    }
}

The log4j2.xml file, positioned in the src/resources folder, contains this (before rabbitMQ):

<?xml version="1.0" encoding="UTF-8"?>

<Configuration monitorInterval="15">
    <Appenders>
        <Console name="STDOUT">
            <PatternLayout>
                <Pattern>%d [%highlight{%-6p}{STYLE=DEFAULT, noConsoleNoAnsi=true}] %C{1}.%M(%F:%L) - %m%n%throwable</Pattern>
            </PatternLayout>
        </Console>
        <RandomAccessFile name="ASYNC_FILE" fileName="logs/app.log" immediateFlush="false" append="true">
            <PatternLayout>
                <Pattern>%d [%-6p] %C{1}.%M(%F:%L) - %m%n%throwable</Pattern>
            </PatternLayout>
        </RandomAccessFile>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="STDOUT"/>
        </Root>
        <Logger name="my.package" level="info" additivity="false">
            <AppenderRef ref="ASYNC_FILE"/>
        </Logger>
    </Loggers>
</Configuration>

The application runs nicely, as well as from the IDE and the JAR, built with the shadowJar plugin.

Now, the mystery begins. Simply by adding the spring-rabbit dependency...

compile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '2.0.2.RELEASE'

...logging starts to behave very strange. Started from the IDE, everything works still nice. Adding the following appender to the log4j2.xml works fine:

    <RabbitMQ name="RABBIT_MQ"
              host="my.host.name" port="5672" user="logger" password="logger" virtualHost="loggerhost"
              exchange="logs" exchangeType="fanout" declareExchange="false"
              applicationId="app-xyz" routingKeyPattern="%X{applicationId}.%c.%p"
              contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
              charset="UTF-8"
              senderPoolSize="3" maxSenderRetries="5">
        <PatternLayout>
            <Pattern>%d [%-6p] %C{1}.%M(%F:%L) - %m%n%throwable</Pattern>
        </PatternLayout>
    </RabbitMQ>

I see the log messages delivered via the RabbitMQ server.

But when I build and run the JAR file built with :shadowJar, logging stops working. On STDOUT, I see the following:

ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'log4j2.debug' to show Log4j2 internal initialization logging.
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.

When I start the application JAR with the -Dlog4j2.debug option I see a lot of messages, and some seem to say, that the configuration cannot be loaded (which is still at the same place). Here an excerpt:

DEBUG StatusLogger Using configurationFactory org.apache.logging.log4j.core.config.ConfigurationFactory$Factory@2a33fae0
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'log4j2.debug' to show Log4j2 internal initialization logging.

I even used the -Dlog4j.configurationFile option, pointing directly to the XML in the file system, but the result stays the same.

Again, starting the application from the IDE and not via java -jar ... works fine. It seems to me as if the spring-rabbitmq dependency brings some extra log4j stuff that interferes with my configuration. I'm totally fishing in murky waters.


Answer:

So, after a lot of research I can answer my question with the following.

It is not enough to simply add the Spring-Rabbit-MQ org.springframework.amqp:spring-rabbit dependency to let the Log4J2 configuration fail. It's the combination with another dependency, that is not listed in my example: com.fasterxml.jackson.core:jackson-databind.

To sum it up. This does not work:

compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.3'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.10.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.10.0'
compile group: 'com.lmax', name: 'disruptor', version: '3.3.7'
compile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '2.0.2.RELEASE'

But this works (without Spring):

compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.3'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.10.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.10.0'
compile group: 'com.lmax', name: 'disruptor', version: '3.3.7'

And this works (without Jackson):

compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.10.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.10.0'
compile group: 'com.lmax', name: 'disruptor', version: '3.3.7'
compile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '2.0.2.RELEASE'

So the combination of Jackson and Spring breaks Log4J2. Interestingly, without Jackson, my program works as well, even if Jackson is under heavy use. Let's have a look on that, what gradle dependecies has to say:

compile - Dependencies for source set 'main' (deprecated, use 'implementation ' instead).
+--- org.apache.logging.log4j:log4j-api:2.10.0
+--- org.apache.logging.log4j:log4j-core:2.10.0
|    \--- org.apache.logging.log4j:log4j-api:2.10.0
+--- com.lmax:disruptor:3.3.7
\--- org.springframework.amqp:spring-rabbit:2.0.2.RELEASE
     +--- org.springframework.amqp:spring-amqp:2.0.2.RELEASE
     |    \--- org.springframework:spring-core:5.0.3.RELEASE
     |         \--- org.springframework:spring-jcl:5.0.3.RELEASE
     +--- com.rabbitmq:amqp-client:5.1.2
     |    \--- org.slf4j:slf4j-api:1.7.25 -> 1.8.0-alpha2
     +--- com.rabbitmq:http-client:1.3.1.RELEASE
     |    +--- org.apache.httpcomponents:httpclient:4.5.3
     |    |    +--- org.apache.httpcomponents:httpcore:4.4.6
     |    |    +--- commons-logging:commons-logging:1.2
     |    |    \--- commons-codec:commons-codec:1.9 -> 1.11
     |    \--- com.fasterxml.jackson.core:jackson-databind:2.9.2
     |         +--- com.fasterxml.jackson.core:jackson-annotations:2.9.0
     |         \--- com.fasterxml.jackson.core:jackson-core:2.9.2
     +--- org.springframework:spring-context:5.0.3.RELEASE
     |    +--- org.springframework:spring-aop:5.0.3.RELEASE
     |    |    +--- org.springframework:spring-beans:5.0.3.RELEASE
     |    |    |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     |    |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    +--- org.springframework:spring-core:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-expression:5.0.3.RELEASE
     |         \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     +--- org.springframework:spring-messaging:5.0.3.RELEASE
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     +--- org.springframework:spring-tx:5.0.3.RELEASE
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     +--- org.springframework:spring-web:5.0.3.RELEASE
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     \--- org.springframework.retry:spring-retry:1.2.1.RELEASE
          \--- org.springframework:spring-core:4.3.9.RELEASE -> 5.0.3.RELEASE (*)

Oh, wait, there's also a Jackson within the Spring AMQP. Well, that solves the riddle, why I can still use Jackson after removing the dependency. But this is very intransparent to me and depends on the Spring AMQP package to deliver a core dependency for me. So what I finally did is this, and it seems to work:

compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.3'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.10.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.10.0'
compile group: 'com.lmax', name: 'disruptor', version: '3.3.7' version: '2.12.0'
compile (group: 'org.springframework.amqp', name: 'spring-rabbit', version: '2.0.2.RELEASE') {
    exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind'
}

The dependency tree looks like the following now:

compile - Dependencies for source set 'main' (deprecated, use 'implementation ' instead).
+--- com.fasterxml.jackson.core:jackson-databind:2.9.3
|    +--- com.fasterxml.jackson.core:jackson-annotations:2.9.0
|    \--- com.fasterxml.jackson.core:jackson-core:2.9.3
+--- org.apache.logging.log4j:log4j-api:2.10.0
+--- org.apache.logging.log4j:log4j-core:2.10.0
|    \--- org.apache.logging.log4j:log4j-api:2.10.0
+--- com.lmax:disruptor:3.3.7
\--- org.springframework.amqp:spring-rabbit:2.0.2.RELEASE
     +--- org.springframework.amqp:spring-amqp:2.0.2.RELEASE
     |    \--- org.springframework:spring-core:5.0.3.RELEASE
     |         \--- org.springframework:spring-jcl:5.0.3.RELEASE
     +--- com.rabbitmq:amqp-client:5.1.2
     |    \--- org.slf4j:slf4j-api:1.7.25 -> 1.8.0-alpha2
     +--- com.rabbitmq:http-client:1.3.1.RELEASE
     |    \--- org.apache.httpcomponents:httpclient:4.5.3
     |         +--- org.apache.httpcomponents:httpcore:4.4.6
     |         +--- commons-logging:commons-logging:1.2
     |         \--- commons-codec:commons-codec:1.9 -> 1.11
     +--- org.springframework:spring-context:5.0.3.RELEASE
     |    +--- org.springframework:spring-aop:5.0.3.RELEASE
     |    |    +--- org.springframework:spring-beans:5.0.3.RELEASE
     |    |    |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     |    |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    +--- org.springframework:spring-core:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-expression:5.0.3.RELEASE
     |         \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     +--- org.springframework:spring-messaging:5.0.3.RELEASE
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     +--- org.springframework:spring-tx:5.0.3.RELEASE
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     +--- org.springframework:spring-web:5.0.3.RELEASE
     |    +--- org.springframework:spring-beans:5.0.3.RELEASE (*)
     |    \--- org.springframework:spring-core:5.0.3.RELEASE (*)
     \--- org.springframework.retry:spring-retry:1.2.1.RELEASE
          \--- org.springframework:spring-core:4.3.9.RELEASE -> 5.0.3.RELEASE (*)

I must admit that I not fully understand why the combination of two Jackson dependencies brings the Log4J2 configuration down, but now it works with this strategy:

  • Declare Jackson dependency explicitly (make transparent, that the application uses Jackson)
  • Exclude Jackson from Spring dependency

And what I really don't get: Why do all combinations work from within the IDE, but later fail when running the JAR?

Question:

I am trying to find ways to declare queues etc in a way that they are automatically created when the applications starts up. I know that this can be done by adding the creation of the queues via Java code but ideally it would be nice if Spring Boot would configure my RabbitMQ environment based on an xml configuration. I tried creating a resources.xml file with no luck so far. So i am wondering if this is even possible?

Any hints on how to proceed or some example how this could be done?


Answer:

To allow to Spring AMQP to populate AMQP object on application startup you must declare them as beans and use <rabbit:> namespace for XML configuration to simplify your life.

Somethis like this:

<rabbit:queue name="my.queue" />

<rabbit:direct-exchange name="my.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="my.queue" key="my.routingKey" />
    </rabbit:bindings>
</rabbit:direct-exchange>

When you have that config in the resources.xml, you will be able to import it to the main Boot config:

@Configuration
@EnableAutoConfiguration
@ImportResource("classpath:com/my/proj/configs/resources.xml")
public class MyConfiguration {
}

Thanks to Spring Boot Autoconfiguration, it produces RabbitAdmin bean for us to populate those AMQP objects on start up.

Question:

I have a project configured this way:

spring:
    cloud:
        stream:
            rabbit:
                bindings:
                    myChannel:
                        consumer:
                            prefetch: 3
                            maxPriority: 10
                            exchangeType: headers
            bindings:
                myChannel:
                    destination: MyChannel
                    group: my-channel-readers
                    consumer:
                        concurrency: 4
                        max-attempts: 3
            binders:
                rabbit:
                    type: rabbit

this indeed creates the correct Exchange (as headers) and Queue, but it doesn't bind them.

Before I added the exchangeType: headers it created the exchange as topic and also bound the queue correctly. Also, If I switch it to exchangeType: topic it creates the binding correctly.

I want this to be bound, and to send items only if they have/not have a certain header - how can I do this?


Answer:

Auto binding to a headers exchange is not currently supported; see the documentation.

exchangeType

The exchange type: direct, fanout or topic for non-partitioned destinations and direct or topic for partitioned destinations.

Default: topic.

Of course, you can bind it manually.

It should be possible to autobind to this type of exchange. Feel free to open a new feature issue on GitHub.

Question:

I am trying to understand RabbitMQ with spring boot and java based configurations. I came across a code in github, where 2 queues are being configured. Please look at the code snippet below:

@Bean
Queue queueFoo() {
    return new Queue("queue.foo", false);
}

@Bean
Queue queueBar() {
    return new Queue("queue.bar", false);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange("exchange");
}

@Bean
Binding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange) {
    return BindingBuilder.bind(queueFoo).to(exchange).with("queue.foo");
}

@Bean
Binding bindingExchangeBar(Queue queueBar, TopicExchange exchange) {
    return BindingBuilder.bind(queueBar).to(exchange).with("queue.bar");
}

There are 2 Queue Bean definitions.- queueFoo and queueBar. Is the binding configuration correct?? In the line-

Binding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange) {

So does it the name of argument - queueFoo have to be matched with the bean name of Queue?? Can anyone please clear my doubt?


Answer:

The argument name should be the same with the method name(for method name would be used as the bean name defaultlly) so that spring can autowire the dependecies. If this way does not work, you can try like this:

@Bean
Binding bindingExchangeFoo() {
    return BindingBuilder.bind(queueFoo()).to(exchange()).with("queue.foo");
}

@Bean
Binding bindingExchangeBar() {
    return BindingBuilder.bind(queueBar()).to(exchange()).with("queue.bar");
}

Question:

All examples I read related with activeMq and spring-boot has especial property to change the url of broker:

spring.activemq.broker-url=<SOME_URL>

By default it uses default settings: default url and default port. But I use rabbirMq and I want to know how to change broker url

I've read this one

I've added application.properties to the src/main/resources with following content(host absolutely wrong, I expected to see error):

spring.rabbitmq.host=olololo
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

But it doesn't affect application. Looks like spring(boot) doesn't read these prioerties.

P.S.

Project structure looks like this:


Answer:

Spring Boot does not have auto configuration support for rabbitmq-jms (the link you referenced is the native RabbitMQ AMQP auto configuration).

For the JMS connection factory, you will have to do the configuration yourself...

@Bean
public RMQConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host,
        @Value("${spring.rabbitmq.port}") int port) {
    RMQConnectionFactory cf = new RMQConnectionFactory();
    cf.setHost(host);
    cf.setPort(port);
    return cf;
}

Question:

I have defined configuration for rabbit in spring:

<rabbit:connection-factory id="amqpConnectionFactory" addresses="${amqp.host}:${amqp.port}"
                           thread-factory="rabbitThreadFactory"
                           cache-mode="CHANNEL"
                           channel-cache-size="25"
                           username="${amqp.user}"
                           password="${amqp.pass}"
                           virtual-host="${amqp.vhost}"/>

<rabbit:admin connection-factory="amqpConnectionFactory" id="rabbitAdmin"/>

<rabbit:topic-exchange id="motoTopicExchange" name="moto.ex.topic" >
    <rabbit:bindings>
        <rabbit:binding pattern="moto.*.speed" queue="motoQueue8"/>
        <rabbit:binding pattern="moto.*.tour" queue="motoQueue9"/>
        <rabbit:binding pattern="moto.*.naked" queue="motoQueue10"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:queue id="motoQueue8" name="moto.queue.8"/>
<rabbit:queue id="motoQueue9" name="moto.queue.9"/>
<rabbit:queue id="motoQueue10" name="moto.queue.10"/>

<rabbit:template id="rabbitTemplate"
                 connection-factory="amqpConnectionFactory"
                 retry-template="retryTemplate"
                 message-converter="rabbitJsonConverter"/>

<bean id="rabbitJsonConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

<rabbit:listener-container connection-factory="amqpConnectionFactory" message-converter="rabbitJsonConverter"
                           max-concurrency="10" acknowledge="auto">
    <rabbit:listener ref="amqpService8" method="handleSimple" queues="motoQueue8"/>
    <rabbit:listener ref="amqpService9" method="handleSimple" queues="motoQueue9"/>
    <rabbit:listener ref="amqpService10" method="handleSimple" queues="motoQueue10"/>
</rabbit:listener-container>

Where handleSimple method in listeners consumes object for example Motorcycle (there is also json conversion when sending thought amqp).

  1. How could I manually ack massage which was passed into listeners ?
  2. Is it possibleto get MessageHeader alongside object (Motorcycle) ?

I don't want to configure listeners thought annotation.

Thanks


Answer:

What is the desire for manual acks? It's quite unusual to need them; the container will take care of acking for you.

To use manual acks, you need a ChannelAwareMessageListener implementation.

You would also have to invoke the message converter yourself.

Question:

I have a project that uses Spring Cloud Streams - RabbitMQ to exchange messages within micro-services. One thing that is critical for my project is that I must not lose any message.

In order to minimize failures, I planned the following:

  • Use the default retry method for messages in queue
  • Configure dead-letter queue to put messages again on queue after some time
  • To avoid an infinite loop, allow only a few times (let's say, 5) a message could be republished from dead-letter queue to regular messaging queue.

The first two items I believe I could make it using the configuration below:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=300000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=

#input
spring.cloud.stream.bindings.myInput.destination=my-queue
spring.cloud.stream.bindings.myInput.group=my-group

However, I could not find searching on this reference guide how to do what I want (mostly, how to configure a maximum number of republish from dead-letter queue). I'm not completely sure I'm on the right path - maybe I should manually create a second queue and code what I want, and leave dead-letter only to messages that completely failed (which I must check regularly and handle manually, since my system should not lose any messages)...

I'm new to these frameworks, and I would like your help to configure mine...


Answer:

This documentation for the rabbit binder shows how to publish a dead-letter to some parking-lot queue after some number of retries have failed.

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

The second example shows how to use the delayed exchange plugin to delay between retries.

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

Question:

I want to add RabbitMQ localhost, port, username, password details directly in wso2, instead of sending through a Java program.

I've a Java webservice (which will produce war file). As of now we've mentioned in the workflow.properties in Main/resources/.

Now when there is a change in RabbitMQ details, every time I've to update the war file. Instead I want to handle this in ws02 side.


Answer:

Well you can't get this done directly from ESB Since your service is a external webservice. There are wo ways you can do this.

  1. Read the configurations from a external file and use them in your webapp.Then you only need to change this file and not the whole war file.
  2. Save the necessary configurations in the Registry (Which comes with WSO2 ESB) and then access the Registry resources from there. You can refer this.

Question:

I'm having some trouble configuring a fixed reply queue to my rabbitMQ message.

I have the producer and consumer as follows:

public class Producer {

    private static ApplicationContext context;
    private static RabbitTemplate template;

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("mq-producer-context.xml");
        context = new ClassPathXmlApplicationContext("context.xml");
        template = context.getBean(RabbitTemplate.class);
        //Queue replyQueue = new Queue("ub.replyqueue");
        //template.setReplyQueue(replyQueue);
    }

    @Scheduled(fixedRate = 1000)
    public void execute() {
        System.out.println("execute...");
        template.convertAndSend("helloooo");
        //template.convertSendAndReceive("helloooo");
    }
}

Consumer:

public class Consumer implements MessageListener {

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("mq-consumer-context.xml");
    }

    public void onMessage(Message message) { 
        System.out.println("message received" + message);
    }
    /**public String handleMessage(String msg) {
        return "RECEIVED!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
    }**/
}

Producer xml config: (in beans... tag)

<beans>
    <import resource="context.xml" />

    <task:scheduler id="myScheduler" pool-size="10" />
    <task:annotation-driven scheduler="myScheduler" />

    <bean id="producer" class="com.urbanbuz.mq.Producer"></bean>
</beans>

consumer xml config:

<beans>
    <import resource="context.xml"/>

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="consumer" queue-names="ub.queue" />
    </rabbit:listener-container>

    <context:annotation-config />
    <context:component-scan base-package="com.urbanbuz.mq" />
    <aop:aspectj-autoproxy />

    <bean id="consumer" class="com.urbanbuz.mq.Consumer"></bean>
</beans>

context.xml:

<beans>
    <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="farahvhost" username="farah" password="farah" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="ub.queue" />

    <rabbit:direct-exchange name="ub.exchange">
        <rabbit:bindings>
            <rabbit:binding queue="ub.queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="ub.exchange" queue="ub.queue" reply-timeout="15000" />
</beans>

Documentation was not really clear. I tried using a convertSendAndReceive on the producer side and an on message handler on the consumer side however that did not work (code commented out as shown above), I got this error: No default listener method specified: Either specify a non-null value for the 'defaultListenerMethod' property or override the 'getListenerMethodName' method.


Answer:

When using a fixed reply queue, you have to configure a listener container on the rabbit template.

The simplest configuration is to just add a <rabbit:reply-listener /> child element to the template.

See the reference documentation for complete details.