Hot questions for Using RabbitMQ in spring integration

Top Java Programmings / RabbitMQ / spring integration

Question:

I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:

Person {
    id: 123
    name: "Something",
    address: {...}
}

If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected and PersonMoved. The problem is on the receiving side where I'm storing a projection of this Person entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.

How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?

Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).


Answer:

Prefetch won't help for a case like this.

Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.

EDIT

Here's a quick boot app to demostrate...

@SpringBootApplication
public class So42969130Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So42969130Application.class, args)
            .close();
    }

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Handler handler;

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("so9130", new PersonNameChanged(123));
        this.template.convertAndSend("so9130", new PersonMoved(123));
        this.handler.latch.await(10, TimeUnit.SECONDS);
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
                        .messageConverter(converter()))
                .aggregate(a -> a
                        .correlationExpression("payload.id")
                        .releaseExpression("false") // open-ended release, timeout only
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(2000))
                .handle(handler())
                .get();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Handler handler() {
        return new Handler();
    }

    @Bean
    public Queue queue() {
        return new Queue("so9130", false, false, true);
    }

    public static class Handler {

        private final CountDownLatch latch = new CountDownLatch(1);

        @ServiceActivator
        public void handle(Collection<?> aggregatedData) {
            System.out.println(aggregatedData);
            this.latch.countDown();
        }

    }

    public static class PersonNameChanged {

        private int id;

        PersonNameChanged() {
        }

        PersonNameChanged(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonNameChanged [id=" + this.id + "]";
        }

    }

    public static class PersonMoved {

        private int id;

        PersonMoved() {
        }

        PersonMoved(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonMoved [id=" + this.id + "]";
        }

    }

}

Pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so42969130</artifactId>
    <version>2.0.0-BUILD-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so42969130</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

Result:

2017-03-23 09:56:57.501  INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]

Question:

I am working on a MicroBlog spring mvc hibernate application. I need to implement a publish subscribe functionality like twitter.

I am using RabbitMQ for messaging with Spring AMQP abstraction.

Everywhere I see on web the pubsub examples are given involving

Spring Integration

Spring AMQP & RabbitMQ

I researched a little more on Spring-Integration & found that a publish subscribe can be implemented with it even without using RabbitMQ.

Now my question is

Why do I need to use Spring Integration with [Spring AMQP & RabbitMQ] to implement a pubsub functionality. Why can't I just use Spring AMQP with Rabbit to do that?

Does Spring integration provide any additional features?

My Spring AMQP & RabbitMQ configuration

<rabbit:connection-factory id="connectionFactory" virtual-host="/" host="localhost" 
username="guest" password="guest"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="UserPostpublishQueue" />

<fanout-exchange name="broadcastUserPosts" xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="UserPostpublishQueue"/>
    </bindings>
</fanout-exchange>

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="broadcastUserPosts" 
queue="UserPostpublishQueue"/>

</beans>

Test code in my controller

@Autowire
private AmqpTemplate amqpTemplate;

try{
        amqpTemplate.convertAndSend(post);
        Post receivedPost = (Post)amqpTemplate.receiveAndConvert();
        System.out.println("received Post "+receivedPost);
        }catch(AmqpException e){
            //deal with exception
        }

Answer:

Spring Integration implements the patterns from http://www.enterpriseintegrationpatterns.com/books1.html while using AMQP/RabbitMQ as one of its many transports.

I understand that spring-amqp is more the AMQP client functionality. If you don't want to use spring. Then we have a plain java client: https://www.rabbitmq.com/java-client.html

Question:

What would be the easiest way to construct this at runtime?

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "providedAtRuntime", durable = "true"),
    exchange = @Exchange(value = "providedAtRuntime", ignoreDeclarationExceptions = "true"),
    key = "providedAtRuntime"), containerFactory = "cFac")
public class RabbitProcessor {
    @RabbitHandler
    public void receive (String smth){
        System.out.println(smth);
    }
}

I would like to define the listener, but provide exchange, queue name and binding at runtime. Also this listener should not start automatically, but when called by start() method. At same time it should auto-declare bindings and queues etc. When called stop(), it should just stop consuming.


Answer:

I think it's not possible with annotations, but you can create a custom SimpleMessageListenerContainer.

here is a simple solution:

public static AbstractMessageListenerContainer startListening(RabbitAdmin rabbitAdmin, Queue queue, Exchange exchange, String key, MessageListener messageListener) {
    rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(key).noargs());
    SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(rabbitAdmin.getRabbitTemplate().getConnectionFactory());
    listener.addQueues(queue);
    listener.setMessageListener(messageListener);
    listener.start();

    return listener;
}

and you can call it as:

ConfigurableApplicationContext ctx = SpringApplication.run(DemoApplication.class, args);

 ConnectionFactory connectionFactory = ctx.getBean(ConnectionFactory.class);
 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
 AbstractMessageListenerContainer container = startListening(rabbitAdmin, rabbitAdmin.declareQueue(),
         new DirectExchange("amq.direct"), "testRoute", message -> {
             System.out.println(new String(message.getBody()));
         });

And you can

AbstractMessageListenerContainer.destroy() or AbstractMessageListenerContainer.stop() it.

Tested with spring boot 1.5.8.RELEASE and RabbitMQ 3.6.10

Question:

I am writing an application that

  • Polls the directory (Sprint Integration DSL flow)
  • Once the file is available it will put in the rabbitmq queue
  • Microservice listen to the stream RabbitMQ and process the file (this is written in Spring Cloud stream)

Now, I cannot figure out what is the best way to send the message to the RabbitMQ from Spring Integration flow service. How can I use spring-cloud stream in the Spring-Integration to publish the message to the RabbitMQ


Answer:

You just need to implement there a Source binding and use RabbitMQ Binder to produce from your source. So, the result of polling files from the directory is going to be published to the Source.OUTPUT (or your custom binding) and everything else will be done by the RabbitMQ Binder: https://docs.spring.io/spring-cloud-stream/docs/Fishtown.M3/reference/htmlsingle/#spring-cloud-stream-overview-producing-consuming-messages

Of course you can do something similar with the plain Spring Integration using an AmqpOutboundEndpoint to publish a message to the appropriate exchange on the RabbitMQ: https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/amqp.html#amqp-outbound-channel-adapter

Question:

I'm using the RabbitMQ 3.6.10 UI to publish a message that is received by my Java application that uses Spring Integration AMQP 4.3.11. The message is a reply to an earlier message that was created using a Splitter, so it had a sequenceNumber and sequenceSize headers. I copy these headers to the reply and set them to the type Number in the RabbitMQ UI. However, on the Java side, I'm getting an exception:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Message conversion failed
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:223)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The 'sequenceNumber' header value must be an Integer.
    at org.springframework.util.Assert.isTrue(Assert.java:92)
    at org.springframework.integration.IntegrationMessageHeaderAccessor.verifyType(IntegrationMessageHeaderAccessor.java:143)
    at org.springframework.messaging.support.MessageHeaderAccessor.setHeader(MessageHeaderAccessor.java:298)
    at org.springframework.messaging.support.MessageHeaderAccessor.copyHeaders(MessageHeaderAccessor.java:389)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:177)
    at org.springframework.integration.support.MessageBuilder.copyHeaders(MessageBuilder.java:47)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:243)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
    ... 14 more

I checked that the types of the sequenceNumber and sequenceSize headers on the Java side are Long, instead of Integer. There is however no option to make this difference in the RabbitMQ UI. The messages will be sent by a non-Java application, so how do I make sure the headers get recognised as Integer by Spring Integration?

When I publish the reply using a Java client and set the header values to Integer, then the consumer accepts them. So this is probably a limitation of the RabbitMQ UI not having the header types specific enough (eg. 32-bit vs. 64-bit number) or Java client being too strict about the expected value type. Can anyone confirm one or the other?


Answer:

Add a MessagePostProcessor to the adapter's listener container...

@Bean
public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
    adapter.setOutputChannelName("someChannel");
    return adapter;
}

@Bean
public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
    container.setQueueNames("foo");
    container.setAfterReceivePostProcessors(m -> {
        if (m.getMessageProperties().getHeaders()
                .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER) instanceof Long) {
            Integer sequenceNumber = ((Long) m.getMessageProperties().getHeaders()
                    .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)).intValue();
            m.getMessageProperties().getHeaders().put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER,
                    sequenceNumber);
        }
        return m;
    });
    return container;
}

Please open a JIRA Issue - we should probably be more lenient, especially if the value is < Integer.MAX_VALUE.

Question:

From the docs, I want to use consume from queues by dynamically changing the consumers without restarting the application.

I do see that Spring RabbitMQ latest version supports the same, but no clue/example/explanation to change the same. I couldn't see proper source code for the same or how to pass params like maxConcurrentConsumers

I am using XML based configuration of Spring RabbitMQ along with Spring integration

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="concurrentConsumers" value="3"/>
    <property name="maxConcurrentConsumers" value="10"/>
    <property name="acknowledgeMode" value="AUTO" />
</bean>

<int-amqp:inbound-channel-adapter channel="lowInboundChannel" queue-names="lowLoadQueue" advice-chain="retryInterceptor" acknowledge-mode="AUTO" listener-container="rabbitListenerContainerFactory" />
<int-amqp:inbound-channel-adapter channel="highInboundChannel" queue-names="highLoadQueue" advice-chain="retryInterceptor" acknowledge-mode="AUTO" listener-container="rabbitListenerContainerFactory" />

Can anyone guide me how to dynamically configure the consumers?


Answer:

First of all you shouldn't share the same rabbitListenerContainerFactory for different <int-amqp:inbound-channel-adapter>s, because they do this:

protected void onInit() {
    this.messageListenerContainer.setMessageListener(new ChannelAwareMessageListener() { 

So, only last adapter wins. From other side there is even no reason to have several adapters. You can specify queue-names="highLoadQueue,lowLoadQueue" for a single adapter. Although in case of listener-container you must specify queues on the SimpleRabbitListenerContainerFactory.

If you want to change some rabbitListenerContainerFactory options at runtime, you can just inject it to some service and invoke its setters.

Let me know if I have missed anything.

Question:

I'm new to spring integration and am confused about how to send error messages to a designated error queue. I want the error message to be a header on the original message and end up in a separate queue. I read that this can be done with a header enricher, which I tried to implement but nothing is showing up in the error queue.

Also, do I need a separate exception handling class in order for the error messages to make it to the error queue or can I just throw exceptions in my transforming methods?

Here is my xml config:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
   xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:context="http://www.springframework.org/schema/context"
   xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans.xsd    
                        http://www.springframework.org/schema/integration    
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/integration/amqp
                        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

   <rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
   <rabbit:admin connection-factory="connectionFactory" />
   <rabbit:queue name="first" auto-delete="false" durable="true" />
   <rabbit:queue name="second" auto-delete="false" durable="true" />
   <rabbit:queue name="errorQueue" auto-delete="false" durable="true" />

   <int:poller default="true" fixed-rate="100"/>

   <rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
     <rabbit:bindings>
        <rabbit:binding queue="second" />
     </rabbit:bindings>
   </rabbit:fanout-exchange>

   <rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
      <rabbit:bindings>
         <rabbit:binding queue="errorQueue" />
      </rabbit:bindings>
   </rabbit:fanout-exchange>

  <int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange"  amqp-template="amqpTemplate" />

   <int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />

   <int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange"  amqp-template="amqpTemplate" />

   <int:channel id="messageInputChannel" />
   <int:channel id="messageOutputChannel"/>
   <int:channel id="errorInputChannel"/>

<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
   <bean class="firstAttempt.MessageErrorHandler"/>

   <int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
     <int:header-enricher>
    <int:error-channel ref="errorInputChannel" />
       </int:header-enricher>
            <int:transformer method = "convert" >
                <bean class="firstAttempt.JsonObjectConverter" />
            </int:transformer>
        <int:service-activator method="transform">
             <bean class="firstAttempt.Transformer" />
        </int:service-activator>
     <int:object-to-string-transformer />
   </int:chain>

</beans>

Error Class:

public class ErrorHandler {
    public String errorHandle(MessageHandlingException exception) {
        return exception.getMessage();

QualityScorer class (called by transformer):

public class QualityScorer {
    private Hashtable<String, String> table;
    private final static String csvFile = "C:\\Users\\john\\Test.csv";

public QualityScorer() throws Exception {
    table = new Hashtable<String, String>();
    initializeTable();
}

private void initializeTable() throws Exception {
     BufferedReader br = null;
      String line = "";
    String cvsSplitBy = ",";
   try {
       br = new BufferedReader(new FileReader(csvFile));
        while ((line = br.readLine()) != null) {
               String[] data = line.split(cvsSplitBy);
            if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
                table.putIfAbsent(data[3], data[1]);
        }
   } catch (FileNotFoundException e) {
       throw new Exception("No file found");
  } catch (IOException e) {
            e.printStackTrace();
  } finally {
        if (br != null) {
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public float getScore(JSONObject object) throws Exception {
        float score;
        if (object == null) {
            throw new IllegalArgumentException("object");
        }
        if (!object.has("source")) {
            throw new Exception("Object does not have a source");
        }
         if (!object.has("employer")) {
            throw new Exception("Object does not have an employer");
        }
        String source = object.getString("Source");
        String employer = object.getString("employer");
            if (table.containsKey(employer) && !source.equals("packageOne")) {
               score = 1;
          } else {
                score = -1;
        }
        return score;
    }
}

Right now, the message being loaded has no source, so the program should be throwing the MessagingException to the MessageErrorHandler.

Transformer code:

public class Transformer {
 private QualityScorer qualityScorer;

 public Transformer() throws Exception {
  qualityScorer = new QualityScorer();
 }

 public JSONObject transform(JSONObject object) throws Exception {

     float score = qualityScorer.getScore(object);
     object.put("score", score);
    return object;
    }
}

All together, the program should receive a pre-loaded message from a queue, transform it and send it on to a second queue, which it does successfully if the source is provided in the pre-loaded message. I'm trying to handle errors and make it so they are sent to an error queue as a message header. This issue has been frustrating me for awhile, so help is greatly appreciated!

The error currently being shown in the stacktrace is:

java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)

But nothing is going to the error queue.


Answer:

When the exception is thrown, it is wrapped together with the requestMessage to the MessagingException. Your own business exception is in the cause and you can get access to the requestMessage from the MessagingException.failedMessage property.

So, it looks like you have everything you need for your use-case. Only the problem that before sending to the error-exchange you really should have some <transformer> in the error flow to properly convert that MessagingException to the proper message to send to the AMQP.

Question:

I am new in Spring Integration and maybe this question is trivial. I am looking at the example (https://github.com/spring-projects/spring-integration-samples/tree/master/basic/amqp) creating RabbitMq message from stdin:

<int-stream:stdin-channel-adapter id="consoleIn"
    channel="toRabbit">
    <int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-stream:stdin-channel-adapter>
<int:channel id="toRabbit" />
<int-amqp:outbound-channel-adapter
    channel="toRabbit" amqp-template="amqpTemplate"
    exchange-name-expression="payload.toLowerCase() == 'nack' ? 'badExchange' : 'si.test.exchange'"
    routing-key-expression="payload.toLowerCase() == 'fail' ? 'badKey' : 'si.test.binding'"
    confirm-correlation-expression="payload"
    confirm-ack-channel="good"
    confirm-nack-channel="errors"
    return-channel="returns" />

What if we need to have a custom message, produced in Java code. What will be the proper elegant code? The bean to populate the message is simplified:

package com.mycompany.domain.price;    
public class UpdateMessage implements Serializable {
Date effStartDate;
Date effEndDate;
Long orderId = -1;
String customerFullName;
...
}

Answer:

See this answer; although it's talking about Axis instead of RabbitMQ; the same techniques apply.

Since your UpdateMessage implements Serializable, the standard message converter will take care of the conversion to a byte[] for you.

Sending a message with no reply, your gateway interface method might be

public void send(UpdateMessage msg);

In which case you'd you use an outbound channel adapter. If you want to get a reply, use an amqp outbound gateway and the service interface might look like

public UpdateResult send(UpdateMessage msg);

If you're not using Serializable objects, use of a json converter might be appropriate instead.

Question:

Imagine I have two Java apps, A and B, using Spring Integration's RabbitMQ support for communication.

Can I make a synchronous/blocking call from A to B? If so, how (roughly)?


Ideally, A has a Spring Integration Gateway which it invokes via e.g. a method called

Object doSomething(Object param)

Then it blocks while the Gateway sends on the message via RabbitMQ to a ServiceActivator on B, and B returns the return value, which eventually becomes the result of the doSomething() method.

It seems this may be possible, but the docs and other Stack Overflow questions don't seem to address this directly.

Many thanks!


Answer:

Actually that's true. Exactly Gateway pattern implements that requirement.

Your client is blocked to wait the result from that gateway's method, but the underlying Integration flow can be async, paralleled etc.

Spring Integration AMQP provides the <int-amqp:outbound-gateway> for the blocking request/reply scenarios with RabbitMQ.

Of course the other, receiving side should take care of the correlation to send reply to the appropiriate replyToAddress from request message. The simples way to use <int-amqp:inbound-gateway> there.

Question:

What would be the easiest way to shut down SimpleMessageListenerContainer (created programatically, not as bean) on any possible error (missing queue, connection problem, etc.), and create new one (with re-declaring all the bindings in the run time.

I'm using helix for partition management, and have 1 listener per partition. One possibility would be also to use existing SimpleMessageListenerContainer (not to always create new one), but in this case, I would need to retry queue re-declaration and rebinding in case of any failure.

Also, there seems to be different kinds of exceptions - fatal (eg queue deleted in runtime) and non fatal (connection lost). How to handle both situations at once?

What would be easier option of these two?

UPDATED

private Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent listenerContainerConsumerFailedEvent) {

    boolean fatal = listenerContainerConsumerFailedEvent.isFatal();
    SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)listenerContainerConsumerFailedEvent.getSource();

    if(fatal){
        AtomicBoolean sd = shuttingDown
                .computeIfAbsent(listenerContainer, v -> new AtomicBoolean(false));
        if(sd.compareAndSet(false, true)) {
            System.out.println("RECREATING");
            String[] qn = listenerContainer.getQueueNames();
            String q = qn[0];
            recreateQueue(q);
            listenerContainer.stop();
            listenerContainer.start();
            //delete from shuttingDown ?
        }
        else{
            System.out.println("RECREATING_NOT");
        }
    }
    else{
        System.out.println("NON_FATAL");
    }
}

and the output

NON_FATAL
NON_FATAL
NON_FATAL
NON_FATAL
22:36:44.044 [SimpleAsyncTaskExecutor-7] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer received fatal=false...\
...

RECREATING
RECREATING_NOT
RECREATING_NOT
RECREATING_NOT
22:36:44.057 [SimpleAsyncTaskExecutor-6] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Stopping container from aborted consumer

Answer:

Add an ApplicationEventPublisher to the container; ListenerContainerConsumerFailedEvents have a fatal boolean property.

EDIT

@SpringBootApplication
public class So47357940Application {

    public static void main(String[] args) {
        SpringApplication.run(So47357940Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(AmqpAdmin admin) {
        return args -> admin.deleteQueue("so47357940");
    }

    @RabbitListener(queues = "so47357940")
    public void listen(String in) {
        System.out.println(in);
    }

    private final Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

    @Bean
    public ApplicationListener<ListenerContainerConsumerFailedEvent> failures(AmqpAdmin admin,
            RabbitTemplate template) {
        return event -> {
            if (event.isFatal()) {
                SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
                AtomicBoolean sd = this.shuttingDown.computeIfAbsent(container, v -> new AtomicBoolean());
                if (sd.compareAndSet(false, true)) {
                    System.out.println("RECREATING");
                    String[] qn = container.getQueueNames();
                    String q = qn[0];
                    admin.declareQueue(new Queue(q));
                    // better to use a shared exec
                    ExecutorService exec = Executors.newSingleThreadExecutor();
                    exec.execute(() -> {
                        while (container.isRunning()) {
                            // should probably give up at some point
                            try {
                                Thread.sleep(100);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        container.start();
                        template.convertAndSend("so47357940", "foo");
                        this.shuttingDown.remove(container);
                    });
                }
                else {
                    System.out.println("RECREATING_NOT");
                }
            }
            else {
                System.out.println("NON_FATAL");
            }
        };
    }

}

Here are the debug logs I get...

RECREATING
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$144/1094003461 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'so47357940'
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@3a813488: tags=[Cancelling Consumer@3a813488: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2017-11-17 17:38:53.903 ERROR 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2017-11-17 17:38:53.903 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Shutting down Rabbit listener container
2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2017-11-17 17:38:54.003 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
2017-11-17 17:38:54.004 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@3a2547b8: tags=[Starting consumer Consumer@3a2547b8: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0], channel=null, acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.005 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'so47357940' with tag amq.ctag-3wMG_13-68ibLL05ir3ySA: Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.005 DEBUG 42372 --- [ool-1-thread-11] o.s.a.r.listener.BlockingQueueConsumer   : ConsumeOK : Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$146/1108520685 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'foo' MessageProperties [headers=Publishing message (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940], contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940]
2017-11-17 17:38:54.012 DEBUG 42372 --- [ool-1-thread-12] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.012 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'foo' MessageProperties [headers=Received message: (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940]), contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940])
2017-11-17 17:38:54.015 DEBUG 42372 --- [cTaskExecutor-3] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=so47357940, amqp_contentEncoding=UTF-8, amqp_deliveryTag=1, amqp_consumerQueue=so47357940, amqp_redelivered=false, id=b614d9e6-1744-b600-7d86-ca9c51ad5844, amqp_consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, contentType=text/plain, timestamp=1510958334014}]]
foo

Question:

I have several Spring-Integration elements configured in the XML file (see below) From the amqp channel adapter the messages are directed to the router integrationSecondaryRouter that has implementation integrationRouterImpl.

If there is a not caught exception in integrationRouterImpl I expect that the Rabbit MQ will send the message again and again. However, this does not happen. The Rabbit MQ monitor does not show any messages accumulation. An error in my configuration?

<int-amqp:inbound-channel-adapter 
    channel="integrationFrontDoorQueueChannel" 
    queue-names="${integration.creation.orders.queue.name}" 
    header-mapper="integrationHeaderMapper"
    connection-factory="connectionFactory" 
    error-channel="errorChannel" 
/>

<int:chain 
        id="integrationFrontDoorQueueChain" 
        input-channel="integrationFrontDoorQueueChannel"
        output-channel="integrationRouterChannel">
    <int:transformer ref="integrationJsonPayloadTransformer" method="transformMessagePayload"/>
    <int:filter ref="integrationNonDigitalCancellationFilter" method="filter"/>
    <int:filter ref="integrationPartnerFilter" method="filter"/>
    <int:filter ref="integrationOrderDtoDgcAndGoSelectFilter" method="filter"/>

</int:chain>

 <int:header-value-router 
    id="integrationPrimaryRouter"
    input-channel="integrationRouterChannel" 
    default-output-channel="integrationFrontDoorRouterChannel"
    resolution-required="false"
    header-name="#{T(com.smartdestinations.constants.SdiConstants).INTEGRATION_PAYLOAD_ACTION_HEADER_KEY}">
    <int:mapping 
        value="#{T(com.smartdestinations.service.integration.dto.IntegrationAction).EXCLUSION_SCAN.name()}" 
        channel="integrationExclusionChannel" 
    />
</int:header-value-router>


<int:router 
        id="integrationSecondaryRouter"
        ref="integrationRouterImpl" 
        input-channel="integrationFrontDoorRouterChannel"
        method="route" 
        resolution-required="false" 
        default-output-channel="nullChannel"
/>

Answer:

Look, you have error-channel="errorChannel" and the Documentation on the matter points out:

The default "errorChannel" is a PublishSubscribeChannel.

Yes, there is one subscriber. but it just _org.springframework.integration.errorLogger.

Since there is no anyone who re-throws your exception to the SimpleMessageListenerContainer, thefore no reason to nack message and redelive it again.

Question:

I have following RabbitMQ Configuration

@Configuration
@IntegrationComponentScan
public class RabbitConfig {

@Autowired // TODO constructor!
private ConnectionFactory connectionFactory;

public RabbitConfig(
        @Value("${article.inbound.queue}") String queueName,
        @Value("${article.inbound.exchange}") String exchangeName,
        @Value("${article.inbound.routingkey}") String routingKey) {
    this.queueName = queueName;
    this.exchangeName = exchangeName;
    this.routingKey = routingKey;
}

@Bean
Exchange exchange() {
    return ExchangeBuilder
            .topicExchange(this.exchangeName)
            .durable(true)
            .build();
}

@Bean
Queue queue() {
    return QueueBuilder.durable(queueName).build();
}

@Bean
Binding binding() {
    return BindingBuilder.bind(queue())
            .to(exchange())
            .with(routingKey)
            .noargs();
}

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public SimpleMessageListenerContainer articleListenerContainer(
        ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(queue());
    container.setMessageConverter(jsonMessageConverter());
    return container;
}

}

And simple Spring Integration Flow

@Bean
IntegrationFlow fromMessageBroker(SimpleMessageListenerContainer messageListener) {
    return IntegrationFlows.from(Amqp.inboundAdapter(messageListener))
            .log()
            .handle(message -> {
                final MessageHeaders headers = message.getHeaders();
                final Object assetId = headers.get("assetId");
                log.info(assetId);
            })
            .get();
}

When I start Spring Boot everything is fine until I publish message to exchange which my defined queue is bind. Then all processing goes further and after that SimpleMessageListenerContainer crashes.

2018-09-20 13:11:08.240  INFO 49400 --- [           main] c.p.ftppush.article.MessageConsumer      : Started MessageConsumer in 1.743 seconds (JVM running for 2.386)
2018-09-20 13:11:12.309  INFO 49400 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=byte[0], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=article.original.orgn.123, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_receivedExchange=que.article.content.pf.normal.trigger, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_deliveryTag=1, assetId=1qh22m9027k6d1jz29tsi510x5, amqp_consumerQueue=exc.article.content, amqp_redelivered=false, id=0d04f22a-991f-000b-63f5-5cb087b915ab, amqp_consumerTag=amq.ctag-bK2-KaIxWpk57HJeQ_38AQ, timestamp=1537441872308}]
2018-09-20 13:11:12.310  INFO 49400 --- [erContainer#0-1] com.perform.ftppush.article.Flow         : 1qh22m9027k6d1jz29tsi510x5
2018-09-20 13:11:12.317 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.

java.lang.AbstractMethodError: org.springframework.integration.channel.interceptor.WireTap.postSend(Lorg/springframework/messaging/Message;Lorg/springframework/messaging/MessageChannel;Z)V
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.postSend(AbstractMessageChannel.java:607) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:460) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:227) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:497) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:465) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$1000(AmqpInboundGateway.java:66) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.process(AmqpInboundGateway.java:315) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.onMessage(AmqpInboundGateway.java:263) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]

2018-09-20 13:11:12.322 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2018-09-20 13:11:12.322  INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2018-09-20 13:11:12.322  INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

As you can see in logs flow goes to the end and then this container crashes. Unfortunately there is no useful information in exception trace. I'm wondering why it is like that. I was trying to redirect this messages in this flow into MessageChannel and then handle them but it didn't help.


Answer:

I would check whether you have libraries in the same versions e.g in stacktrace you may spot that spring-messaging is in different version than spring-integration. Maybe that is the issue?

If you could post your pom.xml or other build config we may investigate more if needed.

Here you may find some explanation of this kind of error https://www.pixelstech.net/article/1469241003-Java-AbstractMethodError-explained-and-demonstrated

Question:

I'm trying to make RPC using spring integration AMQP, I have a method that takes no arguments within my Gateway but the message is not sent to my exchange when I call it.

@MessagingGateway
public interface StatusGateway {
    boolean getStatus();
}

Here my integration-context.xml

<rabbit:connection-factory id="rabbitConnectionFactory" 
    host="172.17.0.2" virtual-host="/myvhost" 
    username="myuser" password="mypasswd" />

<rabbit:template id="default" connection-factory="rabbitConnectionFactory" />
<rabbit:topic-exchange name="slr-input" auto-declare="false" />

<int:gateway id="statusGateway"
    service-interface="com.example.StatusGateway"
    default-request-channel="requestChannel"
    default-reply-channel="replyChannel" />

<int:channel id="requestChannel" />
<int:channel id="replyChannel" />

<int-amqp:outbound-gateway id="statusRequestGateway"
    amqp-template="default"
    exchange-name="slr-input"
    routing-key="operation.status"
    request-channel="requestChannel"
    reply-channel="replyChannel"
    lazy-connect="true" />

And when I call the getStatus method I get the following exception

java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:803) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:784) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:771) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1185) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1174) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]
    at com.example.AmqpTestClientApplication.main(AmqpTestClientApplication.java:14) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_91]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_91]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) [idea_rt.jar:na]
Caused by: java.lang.IllegalStateException: receive is not supported, because no pollable reply channel has been configured
    at org.springframework.util.Assert.state(Assert.java:392) ~[spring-core-4.2.7.RELEASE.jar:4.2.7.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.receive(MessagingGatewaySupport.java:380) ~[spring-integration-core-4.2.8.RELEASE.jar:na]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:416) ~[spring-integration-core-4.2.8.RELEASE.jar:na]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:382) ~[spring-integration-core-4.2.8.RELEASE.jar:na]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:373) ~[spring-integration-core-4.2.8.RELEASE.jar:na]
    at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:64) ~[spring-integration-core-4.2.8.RELEASE.jar:na]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.7.RELEASE.jar:4.2.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-4.2.7.RELEASE.jar:4.2.7.RELEASE]
    at com.sun.proxy.$Proxy44.getStatus(Unknown Source) ~[na:na]
    at com.example.AmqpTestClientApplication.lambda$commandLineRunner$0(AmqpTestClientApplication.java:20) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:800) [spring-boot-1.3.6.RELEASE.jar:1.3.6.RELEASE]

Does anybody know how to make a request without sending a non-empty message?


Answer:

A gateway method with no arguments is a receive operation by default; see Messaging Gateway: Invoking No-Argument Methods.

You can use something like adding default-payload-expression="''" to the gateway (an empty String).

Question:

I wrote a simple message flow with request and reply. I have to use two independent queues so i declare AmqpOutboundAdapter to send a message and AmqpInboundAdapter to receive a reply.

@Bean
@FindADUsers
public AmqpOutboundEndpoint newFindADUsersOutboundAdapter() {
    return Amqp.outboundAdapter(amqpTemplate())
            .routingKeyExpression("headers[" + ADUsersFindConfig.ROUTING_KEY_HEADER + "]")
            .exchangeName(getExchange())
            .headerMapper(amqpHeaderMapper())
            .get();
}

@Bean
public AmqpInboundChannelAdapter newFindADUsersResponseInboundChannelAdapter(
        ADUsersFindResponseConfig config) {
    return Amqp.inboundAdapter(rabbitConnectionFactory(), findADUsersResponseQueue)
            .headerMapper(amqpHeaderMapper())
            .outputChannel(config.newADUsersFindResponseOutputChannel())
            .get();
}

It should work with @MessagingGateway:

@MessagingGateway
public interface ADUsersFindService {

     String FIND_AD_USERS_CHANNEL = "adUsersFindChannel";

     String FIND_AD_USERS_REPLY_OUTPUT_CHANNEL = "adUsersFindReplyOutputChannel";

     String FIND_AD_USERS_REPLY_CHANNEL = "adUsersFindReplyChannel";

     String CORRELATION_ID_REQUEST_HEADER = "correlation_id";

     String ROUTING_KEY_HEADER = "replyRoutingKey";

     String OBJECT_TYPE_HEADER = "object.type";

     @Gateway(requestChannel = FIND_AD_USERS_CHANNEL, replyChannel = FIND_AD_USERS_REPLY_CHANNEL)
ADResponse find(ADRequest adRequest, @Header(ROUTING_KEY_HEADER) String routingKey, @Header(OBJECT_TYPE_HEADER) String objectType);
}

And the ADUsersFindResponseConfig class looks like:

 @Configuration
 @Import(JsonConfig.class)
 public class ADUsersFindResponseConfig {

     @Autowired
     public NullChannel nullChannel;

     @Autowired
     private JsonObjectMapper<?, ?> mapper;

     /**
      * @return The output channel for the flow
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_OUTPUT_CHANNEL)
     public MessageChannel newADUsersFindResponseOutputChannel() {
         return MessageChannels.direct().get();
     }

     /**
      * @return The output channel for gateway
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_CHANNEL)
     public MessageChannel newADUsersFindResponseChannel() {
         return MessageChannels.direct().get();
     }

     @Bean
     public IntegrationFlow findADUsersResponseFlow() {
         return IntegrationFlows
                 .from(newADUsersFindResponseOutputChannel())
                 .transform(new JsonToObjectTransformer(ADResponse.class, mapper))
                 .channel(newADUsersFindResponseChannel())
                 .get();
     }
 }

Sending message works properly, but i have a problem with receiving message. I am expecting that received message will be passed to channel called FIND_AD_USERS_REPLY_OUTPUT_CHANNEL, then the message will be deserialized to ADResponse object using findADUsersResponseFlow , and next ADResponse object will be passed to gateway replyChannel - FIND_AD_USERS_REPLY_CHANNEL. Finally, 'find' method return this object. Unfortunately when org.springframework.integration.handler.BridgeHandler receive a message, i got exception:

 org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

Message log looks like:

 11:51:35.697 [SimpleAsyncTaskExecutor-1] INFO  New message - GenericMessage [payload={...somepayload...}, headers={correlation_id=7cbd958e-4b09-4e4c-ba8e-5ba574f3309a, replyRoutingKey=findADUsersResponse.ad, amqp_consumerQueue=findADUsersResponseQueue, history=newFindADUsersResponseInboundChannelAdapter,adUsersFindReplyOutputChannel,adUsersFindReplyChannel,infoLog,infoLoggerChain.channel#0,infoLoggerChain.channel#1, id=37a4735d-6983-d1ad-e0a1-b37dc17e48ef, amqp_consumerTag=amq.ctag-8Qs5YEun1jXYRf85Hu1URA, object.type=USER, timestamp=1469094695697}]

So i'm pretty sure that message was passed to adUsersFindReplyChannel. Also (if it's important) both request message and reply message have 'replyTo' header set to null. What am I doing wrong?


Answer:

The replyChannel header is a live object and can't be serialized over AMQP.

You can use an outbound gateway instead of the pair of adapters and the framework will take care of the headers.

If you must use adapters for some reason, you need to do 2 things:

  1. Use the header channel registry to convert the channel object to a String which is registered with the registry.

  2. Make sure that the header mapper is configured to send/receive the replyChannel header and that your receiving system returns the header in the reply.