Hot questions for Using RabbitMQ in dead letter

Question:

I have an existing queue created in RabbitMQ. It can be created with or without x-dead-letter-exchange parameter. I am creating a consumer of this queue in Spring using the RabbitTemplate. When I declare the queue, I don't want to specify the x-dead-letter-exchange parameter. I would like the template to somehow figure it itself or not care. I am throwing AmqpRejectAndDontRequeueException from my consumer to indicate bad messages, but I want the creator of the queue to be responsible for the decision whether or not to create an exchange and queue for the rejected messages.

Here is my bean that declares the queue in Spring:

@Bean
Queue queue() {
    Map<String, Object> args = new HashMap<>();
    // set the queue with a dead letter feature
    args.put("x-dead-letter-exchange", REJECTED_EXCHANGE);
    args.put("x-dead-letter-routing-key", REJECTED_ROUTING_KEY);
    Queue queue = new Queue(Constants.QUEUE_NAME, false, false, false, args);
    return queue;
}

This works fine, but when the creator of the queue decides not to use the dead letter feature, I see the following error:

Channel shutdown: channel error; protocol method: #method<channel.close>
(reply-code=406, reply-text=PRECONDITION_FAILED - 
inequivalent arg 'x-dead-letter-exchange' for queue 'queueName'

The message is a bit longer, it continues telling me which side has which x-dead-letter-exchange (none or a name of the exchange). I've tried different combinations (e.g. creating the queue with exchange and not specifying it in the Spring or creating the queue without the exchange and specifying it in the Spring), only to see different variants of this message.

How do I declare the queue so it simply accepts whatever parameters are already set in the queue?


Answer:

Yes, The possible cause is - if you declare some queues manually and later your program (client in code) tries to create one (based on the settings you had in code) then you get this error. The reason behind it is when your code (client application) tries to access one queue. It gets a signal from the server that the connection is not available for this.

To solve this problem

  • Delete all the queues that you have created manually, and let the client program create them by itself.
  • If you got problems in deleting the queues, because of some data is there in it, or for some reason, you want to maintain it, create one queue manually, and move all the queue data to be deleted in it through "Move" tab of the queue.

Question:

I have the following PHP application. That publishes a user signUp to a message queue. The Java Application reads from that queue and imports it. Hopefully the diagram below will discribe it. I am only working with the Java side of things. The json messages exists on the queue already.

Route (Java Consuming Side).

@Component
public class SignUpRouting {

  errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());

  from("rabbitmq://phpSignUp.exchange?username=etc....")
            .routeId("signUpRoute")
            .processRef("signUpProcessor")
            .end();
  //.... 

The processor..

@Component
public class SignupProcessor implements Processor {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void process(Exchange exchange) throws Exception {

        String json = exchange.getIn().getBody(String.class);
        SignUpDto dto = mapper.readValue(json, SignUpDto.class);

        SignUp signUp = new SignUp();
        signUp.setWhatever(dto.getWhatever());
        //etc....

        // save record
        signUpDao.save(signUp);
    }
}

My question is this.. What should I do I do when the Processor fails to import the message.

Lets say for example there was a DAO exception. A data field may have been toolong or the import was in the incorrect format. I dont want to lose the message. I would like to see the error and retry the import. But I would not want to keep retrying the message every 30 seconds.

I am thinking that I would need to create another queue.. A dead letter queue and have that indefinately retry the message every 6 hours?.. I would then view the logs see the error and upload a fix and the message would be reprocessed?

How would I implement that? Or am I on the wrong track?

EDIT I have tried setting deadLetterExchange to see if would get things on the right direction... However it errors and says queue cannot be non null

 rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange

Answer:

Here is a example to use dead letter headers:

        <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp;
            exchangeType=topic&amp;
            routingKey=user.reg.*&amp;
            deadLetterExchange=dead.msgs&amp;
            deadLetterExchangeType=topic&amp;
            deadLetterRoutingKey=dead.letters&amp;
            deadLetterQueue=dead.letters&amp;
            autoAck=false&amp;
            autoDelete=false"/>

          <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
        <onException useOriginalMessage="true">
            <exception>java.lang.Exception</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
        </onException>

We need to turn off autoAck and set deadLetterQueue, then if there is an Exception thrown, the message will be in dead letters queue. To use onException, we can control the retrying before camel dropping the message to dead letter queue.

Question:

I have a rabbit queue with messages for consuming. I also have a listener that can fail. The queue is configured with a dead letter exchange (along with a dead letter queue). What I want is to see an exception info in the messages sitting in the dead letter queue.

Here is how it works currently:

  1. I send a corrupted message to my normal queue.
  2. My listener (I'm using Java's org.springframework.amqp.core.MessageListener) fails with something like: "java.lang.RuntimeException: corrupted message"
  3. The message gets rejected and goes to the dead letter queue thru the dead letter exchange.
  4. When I look at the dead-lettered message in the Rabbit Admin UI, I see: headers: x-death: reason: rejected

But what I want is to see the "java.lang.RuntimeException: corrupted message" somewhere on UI. I assume it should be a custom header?

Is it possible to, for example, put a general try-catch to my listener and enhance the headers with the exception info?


Answer:

No; RabbitMQ (actually the AMQP specification) provides no mechanism for the consumer to to enhance rejected messages with additional information. The protocol only supports acknowledging or rejecting messages.

Spring AMQP, together with a retry interceptor, provides a mechanism to republish the message to a different queue (which can be the same as the DLQ) with additional information in the headers (exception stack trace etc).

See RepublishMessageRecoverer in the section about error handling with asynchronous consumers.

Question:

Below is the configuration I'm using. Messages with no errors, work fine from exchange to queue with conversion picked up by the listener, its great. What I am wanting to happen with erroneous messages is that when I throw a AmqpRejectAndDontRequeueException, "rabbitQueue" will forward the message to it's dead letter exchange and end up in the "rabbitErrorQueue." There's no activity on the dead letter exchange or queue though. Can anyone see what I'm doing wrong here?

    <beans
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="clientConnectionFactory"
          class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    </bean>

    <rabbit:connection-factory
            id="rabbitConnectionFactory"
            connection-factory="clientConnectionFactory"
            host="${rabbit.broker.url}"
            port="${rabbit.broker.port}"
            username="${rabbit.username}"
            password="${rabbit.password}"
            publisher-confirms="true"/>

    <rabbit:admin connection-factory="rabbitConnectionFactory" />

    <rabbit:template id="rabbitTemplate"
                     connection-factory="rabbitConnectionFactory"
                     exchange="${rabbit.exchange.name}"
                     message-converter="messageConverter"
                     queue="${rabbit.queue.name}" >
    </rabbit:template>


    <rabbit:queue id="rabbitQueue" name="${rabbit.queue.name}" >
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="${rabbit.dead.letter.exchange.name}"/>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:queue id="rabbitErrorQueue" name="${rabbit.dead.letter.queue.name}" />

    <rabbit:fanout-exchange id="fanoutExchange" name="${rabbit.exchange.name}">
        <rabbit:bindings>
            <rabbit:binding queue="rabbitQueue" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>


    <rabbit:direct-exchange id="directErrorExchange" name="${rabbit.dead.letter.exchange.name}">
        <rabbit:bindings>
            <rabbit:binding key="${rabbit.queue.name}" queue="rabbitErrorQueue" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="messageConverter" class="com.example.RabbitMQExampleEventMessageConverter"/>

    <bean id="rabbitMQExampleConsumer" class="com.example.consumer.RabbitMQExampleConsumer">
        <constructor-arg name="eventProcessor" ref="userEventProcessor" />
    </bean>

    <rabbit:listener-container connection-factory="rabbitConnectionFactory" message-converter="messageConverter">
        <rabbit:listener queues="${rabbit.queue.name}" ref="rabbitMQExampleConsumer" method="onMessage" />
    </rabbit:listener-container>
</beans>

Answer:

Try adding an explicit x-dead-letter-routing-key - otherwise the same key as the original route is used - and there's no routing key needed for a fanout exchange.

Question:

I am working on a Spring project, and am trying to implement exponential backoff with dead lettering for a RabbitMQ queue. In the process, I've created a Dead Letter Queue and a Dead Letter Exchange (Fanout), and set the x-dead-letter-exchange argument for the original queue to the dead letter exchange's name, and created a RetryTemplate with an ExponentialBackOffPolicy. For testing purposes, my consumer simply rejects all messages it gets by throwing an exception.

This is what my RabbitMQConfiguration class looks like:

@Configuration
@EnableAutoConfiguration
@PropertySource("file:${HOME}/common/config/wave-planning.properties")
public class RabbitMQConfiguration {

    private final static String QUEUE_NAME = "orderPlanQueue";

    private static final String EXCHANGE_NAME = "orderPlanExchange";

    private static final String DL_EXCHANGE_NAME = "deadLetterExchange";

    private static final String DL_QUEUE_NAME = "deadLetterQueue";

    @Value("${rabbitmq.host:localhost}")
    private String host;

    @Value("${rabbitmq.port:5672}")
    private int port;

    @Value("${rabbitmq.user:guest}")
    private String userName;

    @Value("${rabbitmq.password:guest}")
    private String password;

    @Value("${rabbitmq.initial_backoff_interval:1000}")
    private int INITIAL_INTERVAL_IN_MILLISECONDS;

    @Value("${rabbitmq.max_backoff_interval:10000}")
    private int MAX_INTERVAL_IN_MILLISECONDS;

    @Autowired
    OrderPlanService orderPlanService;

    @Bean
    Queue queue() {
        Map<String, Object> qargs = new HashMap<String, Object>();
        qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, false, false, false, qargs);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    FanoutExchange deadLetterExchange() { return new FanoutExchange(DL_EXCHANGE_NAME); }

    @Bean
    Queue deadLetterQueue() { return new Queue(DL_QUEUE_NAME); }

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public MessageConverter Jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        RetryTemplate retry = new RetryTemplate();
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();

        policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
        policy.setMultiplier(2);
        policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);

        retry.setBackOffPolicy(policy);
        template.setRetryTemplate(retry);

        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageConverter(Jackson2JsonMessageConverter());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setDefaultRequeueRejected(false);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(orderPlanService, "consume");
    }
}

The relevant portion of the consumer is basically this:

@Service
@Transactional
public class BaseOrderPlanService implements OrderPlanService {

    ....

    @Override
    public void consume(Object object) {
        throw new IllegalArgumentException("Test");
    }
}

For the autowired integer values, the default value is used. In running this, I see that the exchanges and queues are created on rabbitmq as expected, with the expected bindings and arguments where relevant. However, when I pass a message to the orderPlanExchange with routing key "orderPlanQueue", it will cause an infinite loop as the message is rejected and replaced on the queue repeatedly. If, on the other hand, the IllegalArgumentException is replaced with an AmqpRejectAndDontRequeueException, the message is simply thrown into the dead letter queue on the first rejection attempt.

If anyone could point out what I might be doing wrong here that the retry policy is not being applied, I'd much appreciate it.

Edit: Code with StatefulRetryOperationsInterceptor as per Artem's suggestion.

@Configuration
@EnableAutoConfiguration
@PropertySource("file:${HOME}/common/config/wave-planning.properties")
public class RabbitMQConfiguration {

    private final static String QUEUE_NAME = "orderPlanQueue";

    private static final String EXCHANGE_NAME = "orderPlanExchange";

    private static final String DL_EXCHANGE_NAME = "deadLetterExchange";

    private static final String DL_QUEUE_NAME = "deadLetterQueue";

    @Value("${rabbitmq.host:localhost}")
    private String host;

    @Value("${rabbitmq.port:5672}")
    private int port;

    @Value("${rabbitmq.user:guest}")
    private String userName;

    @Value("${rabbitmq.password:guest}")
    private String password;

    @Value("${rabbitmq.initial_backoff_interval:1000}")
    private int INITIAL_INTERVAL_IN_MILLISECONDS;

    @Value("${rabbitmq.max_backoff_interval:10000}")
    private int MAX_INTERVAL_IN_MILLISECONDS;

    @Autowired
    OrderPlanService orderPlanService;

    @Bean
    Queue queue() {
        Map<String, Object> qargs = new HashMap<String, Object>();
        qargs.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, false, false, false, qargs);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }

    @Bean
    FanoutExchange deadLetterExchange() { return new FanoutExchange(DL_EXCHANGE_NAME); }

    @Bean
    Queue deadLetterQueue() { return new Queue(DL_QUEUE_NAME); }

    @Bean
    Binding deadLetterBinding(Queue deadLetterQueue, FanoutExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    public MessageConverter Jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        /*
        RetryTemplate retry = new RetryTemplate();
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();

        policy.setInitialInterval(INITIAL_INTERVAL_IN_MILLISECONDS);
        policy.setMultiplier(2);
        policy.setMaxInterval(MAX_INTERVAL_IN_MILLISECONDS);

        retry.setBackOffPolicy(policy);
        template.setRetryTemplate(retry);
        */

        template.setRoutingKey(QUEUE_NAME);
        template.setMessageConverter(Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    StatefulRetryOperationsInterceptor interceptor() {
        return RetryInterceptorBuilder.stateful()
                .maxAttempts(4)
                .backOffOptions(INITIAL_INTERVAL_IN_MILLISECONDS, 2, MAX_INTERVAL_IN_MILLISECONDS)
                .build();
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setMessageConverter(Jackson2JsonMessageConverter());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        container.setAdviceChain(new Advice[] {interceptor()});
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(orderPlanService, "consume");
    }

}

Answer:

The retry policy on RabbitTemplate is fully unrelated to the DLQ/DLX. That is for the consumer side.

See the difference in the Reference Manual here:

you can now configure the RabbitTemplate to use a RetryTemplate to help with handling problems with broker connectivity.

and here:

To put a limit in the client on the number of re-deliveries, one choice is a StatefulRetryOperationsInterceptor in the advice chain of the listener.

So, you have to reconsider your logic and put retry capabilities to the SimpleMessageListenerContainer definition.

Question:

Using Spring AMQP - spring-amqp with Spring Boot

Here I am trying to implement dead letter exchange. I am sending message to a queue, if some business exception occurs then it should send the message to "dlq" queue and wait there for 5 seconds and then it should come into queue for processing again..... After 5 try it should come out from container.

Please find the configruation

application.yml

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
server:
port: 8081

MQ Config

import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MQConfig {

// public static final String OUTGOING_QUEUE = "outgoing.example";

  // public static final String INCOMING_QUEUE = "incoming.example";

  @Autowired
  private ConnectionFactory cachingConnectionFactory;

  // Setting the annotation listeners to use the jackson2JsonMessageConverter
  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cachingConnectionFactory);
    factory.setMessageConverter(jackson2JsonMessageConverter());
    return factory;
  }

  // Standardize on a single objectMapper for all message queue items
  @Bean
  public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  public Queue outgoingQueue() {
    Map<String, Object> args = new HashMap<String, Object>();
    // The default exchange
    args.put("x-dead-letter-exchange", "dlx");
    // Route to the incoming queue when the TTL occurs
    // args.put("x-dead-letter-routing-key", "q.with.dlx");
    // TTL 5 seconds
    args.put("x-message-ttl", 5000);
    return new Queue("q.with.dlx", false, false, false, args);
  }

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
    return rabbitTemplate;
  }

  @Bean
  public Queue incomingQueue() {
    return new Queue("dlq");
  }

  @Bean
  public DirectExchange directExchange() {
      return new DirectExchange("dlx") ; 
  }

  @Bean
  public Binding binding(Queue incomingQueue, DirectExchange directExchange) {
      return BindingBuilder.bind(incomingQueue()).to(directExchange()).with("q.with.dlx");
  }

Publisher

import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.spring.amqp.domain.ExampleObject;

@Component
@RestController
@RequestMapping("/publish")
public class Publisher {

@Autowired
private RabbitTemplate rabbitTemplate;

// Scheduled task to send an object every 5 seconds
// @Scheduled(fixedDelay = 5000)
@GetMapping()
public void sender() {
    ExampleObject ex = new ExampleObject();
    ex.setDate(new Date());
    rabbitTemplate.convertAndSend("q.with.dlx",ex);
}
}

Consumer

package com.spring.amqp.service;
import java.util.List;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import com.spring.amqp.domain.ExampleObject;

@Component
public class Consumer {

// Annotation to listen for an ExampleObject
@RabbitListener(queues = "q.with.dlx")
public void handleMessage(ExampleObject exampleObject,
        @Header(required = false, name = "x-death") List<String> xDeath) {
    System.out.println("Message" + ":" + (xDeath == null ? "" : xDeath));
    System.out.println("Received incoming object at " + exampleObject.getDate());


    // // String x_header_count = xDeath.get("count");
    // System.out.println(x_header_count);
    try{
    int a = 5 / 0;
    System.out.println(a);
    }
    catch(Exception ex) {
        throw new AmqpRejectAndDontRequeueException("amqp exception") ;
    }


}
}

**in x-header I am getting nothing. ** Now when I am hitting localhost:8081/publish, it sends a message to q.with.dlx and I am throwing AmqpRejectAndRequeue exception and then that message is getting struck in the "dlq" named queue. Nothing happens after that. I have one domain object named ExampleObject which I am sending from publisher to consumer.

Please cross check all my configuration and if possible can someone run this and let me know what is the mistake? Thanks in advance.

And Gary Russell Thanks for this awesome messaging framework.


Answer:

You have the TTL on the wrong queue.

You need to configure time-to-live dead-lettering on your dlq:

  @Bean
  public Queue incomingQueue() {
      return new Queue("dlq");
  }

Add arguments x-message-ttl = 5000 and dead-letter configuration to route the expired message back to the original queue.

Your queue bean names are a bit confusing; you would normally have something like...

someExchange -> mainQueue (with dead-lettering to DLX)

DLX -> dlq (with TTL and dead-lettering to someExchange)

Question:

I read a lot of documentation/stackoverflow and still I have problem when exception occurs to move message to dead letter queue. I'm using spring-boot Here is my configuration:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    RetryOperationsInterceptor interceptor() {
        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
        return RetryInterceptorBuilder
            .stateless()
            .recoverer(recoverer)
            .build();
    }

Dead letter queue:

Features    
x-dead-letter-routing-key:  error_key
x-dead-letter-exchange: error_exchange
durable:    true
Policy  DLX

Name of the queue: error

My exchange: name:error_exchange binding: to: error, routing_key: error_key

Here is my conusmer:

@RabbitListener(queues = "${rss_reader_chat_queue}")
    public void consumeMessage(Message message) {
        try {
            List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
            List<ChatMessage> save = chatMessageRepository.save(chatMessages);
            sendMessagesToChat(save);
        }
        catch(Exception ex) {
            throw new AmqpRejectAndDontRequeueException(ex);
        }
    }

So when I send an invalid message and some exception occurs, it happens once (and it's good because previously message was sent over and over again) but the message doesn't go to my dead letter queue. Can you help me with this?


Answer:

You need to show the rest of your configuration - boot properties, queue @Beans etc. You also seem to have some confusion between using a republishing recoverer Vs dead letter queues; they are different ways to achieve similar results. You typically wouldn't use both.

Here's a simple boot app that demonstrates using a DLX/DLQ...

@SpringBootApplication
public class So43694619Application implements CommandLineRunner {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
        context.close();
    }

    @Autowired
    RabbitTemplate template;

    @Autowired
    AmqpAdmin admin;

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void run(String... arg0) throws Exception {
        this.template.convertAndSend("so43694619main", "foo");
        this.latch.await(10, TimeUnit.SECONDS);
        this.admin.deleteExchange("so43694619dlx");
        this.admin.deleteQueue("so43694619main");
        this.admin.deleteQueue("so43694619dlx");
    }


    @Bean
    public Queue main() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "so43694619dlx");
        args.put("x-dead-letter-routing-key", "so43694619dlxRK");
        return new Queue("so43694619main", true, false, false, args);
    }

    @Bean
    public Queue dlq() {
        return new Queue("so43694619dlq");
    }

    @Bean
    public DirectExchange dlx() {
        return new DirectExchange("so43694619dlx");
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
    }

    @RabbitListener(queues = "so43694619main")
    public void listenMain(String in) {
        throw new AmqpRejectAndDontRequeueException("failed");
    }

    @RabbitListener(queues = "so43694619dlq")
    public void listenDlq(String in) {
        System.out.println("ReceivedFromDLQ: " + in);
        this.latch.countDown();
    }

}

Result:

ReceivedFromDLQ: foo

Question:

I have dead lettering set up for a few queues that I am using. In the configuration I use:

<bean id="retryAdvice"
    class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
</bean>

<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

If i want the message on the dead letter queue to have the x-exception-stacktrace , I read that I need to use RepublishMessageRecoverer. Is using that in dead lettering as simple as replacing RejectAndDontRequeueRecoverer with RepublishMessageRecoverer in the above setting or do I need to write a custom messageRecoverer?


Answer:

Yes, just replace the recoverer. Normal RabbitMQ dead lettering is not used at all (the message will be ack'd) and you don't need to configure the DLE/DLQ; you have complete control in the recoverer over where the message goes.

Question:

I configured the default DLQ as following:

spring:
  cloud:
    stream:
      rabbit:
        default:
          consumer:
            auto-bind-dlq: true
            republish-to-dlq: true
            dead-letter-queue-name: my-dlq

I want to consume the messages in that default DLQ and process it in the business domain.

How can I listen those messages using Spring Cloud Stream?

Thanks


Answer:

There is nothing built in, but the documentation shows some techniques...

Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue. However, if the problem is a permanent issue, that could cause an infinite loop. The following Spring Boot application shows an example of how to route those messages back to the original queue but moves them to a third "parking lot" queue after three attempts. The second example uses the RabbitMQ Delayed Message Exchange to introduce a delay to the re-queued message. In this example, the delay increases for each attempt. These examples use a @RabbitListener to receive messages from the DLQ. You could also use RabbitTemplate.receive() in a batch process.

...

Question:

With Spring and RabbitMQ I set up two topic-exchanges xand dlx and two queues qand dlq. q is bound to x and dlq to dlx. dlx is configured as dead-letter-exchange for q.

When a message in q is rejected (with unqueue) it is successfully send to dlx and then to dlq.

Now I use the shovel-plugin to move dead-lettered messages in dlq back to q. This works successfully as long as the messages are processed successfully this time (ack).

But if one of these shovelled messages inq is rejected again it is dropped silently. I expect it to be send to the DLX dlx again. Did I configure something wrong or did I misunderstood the concept of DLX or shovels?


Answer:

I suspect you are hitting a flavor of this...

It is possible to form a cycle of message dead-lettering. For instance, this can happen when a queue dead-letters messages to the default exchange without specifiying a dead-letter routing key. Messages in such cycles (i.e. messages that reach the same queue twice) will be dropped if there was no rejections in the entire cycle.

...because you're shoveling. See Dead Letter Exchanges.

Instead, configure the DLQ with TTL, and dead-letter configuration that causes expired messages to be routed back to the original queue. That way, the x-death header gets two entries - 1 for the rejections from the original queue and 1 for the expiry from the DLQ.

I am guessing that, with shoveling, the broker thinks there is a cycle.

Question:

hi am trying to achieve dead letter exchange in spring integration XML so the scenario is AAA exchange binded BBB queue if BBB queue getting failed in some scenario like lister threw exception i want to navigate exception to Dead exchange queue to store message below is the code

created sample project

main.java

package com.spring.rabbit.first.deadletter;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("/applicationContext.xml");
    }
}

xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- Spring configuration -->

    <context:component-scan base-package="com.spring.rabbit.first" />
    <context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />

    <!-- RabbitMQ common configuration -->

    <rabbit:connection-factory id="connectionFactory"
        username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />


    <!-- <rabbit:connection-factory id="connectionFactory"/> -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- Queues -->

    <rabbit:queue id="springQueue" name="spring.queue"
        auto-delete="true" durable="false" />

    <rabbit:listener-container
        connection-factory="connectionFactory" advice-chain="retryAdvice">
        <rabbit:listener queues="BBBqueue" ref="messageListener" />
    </rabbit:listener-container>

    <bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />



    <bean id="retryAdvice"
        class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
        <property name="retryOperations" ref="retryTemplate" />
    </bean>

    <bean id="rejectAndDontRequeueRecoverer"
        class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />


    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="2000" />
                <property name="multiplier" value="10.0" />
                <property name="maxInterval" value="30000" />
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="3" />
            </bean>
        </property>
    </bean>



    <rabbit:topic-exchange name="AAAqueue">
        <rabbit:bindings>
            <rabbit:binding queue="BBBqueue" pattern="" />
        </rabbit:bindings>
    </rabbit:topic-exchange>


    <rabbit:queue name="BBBqueue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
        </rabbit:queue-arguments>
    </rabbit:queue>


    <!-- dead letter -->

    <rabbit:topic-exchange name="XXX.dead.letter">
        <rabbit:bindings>
            <rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>



</beans>

Message handler

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageHandler implements MessageListener {

    @Override
    public void onMessage(Message message) {

        System.out.println("Received message: " + message);
        System.out.println("Text: " + new String(message.getBody()));

        message = null;
        if (message == null) {
            throw new NullPointerException();
        }
    }
}

Messagesender

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;

@Service
@ManagedResource
public class MessageSender {

    @Autowired
    private AmqpTemplate template;

    @ManagedOperation
    public void send(String text) {
        send("amq.fanout", "NDPAR.SPRING.JAVA", text);
    }

    @ManagedOperation
    public void send(String exchange, String key, String text) {
        template.convertAndSend(exchange, key, text);
    }
}

output:

23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer - 
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
 receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

But still i did't see any message in dead letter queue.is am missing anything?? kindly help on this


Answer:

I am not sure what your XXX-channel and adapter are supposed to do, but you need to add a RejectAndDontRequeueRecoverer to the retry advice factory bean (in the messageRecoverer property).

The default recover just logs that retries are exhausted and discards the message.

EDIT

Here is a custom MessageRecoverer that publishes a failed message from queue A to a queue named A.dlq - the queue and binding are declared automatically, as needed.

/*
 * Copyright 2014-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;

public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {

    public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";

    public static final String X_EXCEPTION_MESSAGE = "x-exception-message";

    public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";

    public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";

    private final Log logger = LogFactory.getLog(getClass());

    private final RabbitTemplate errorTemplate;

    private final RabbitAdmin admin;

    private final String deadLetterExchangeName = "DLX";

    private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);

    private boolean initialized;

    public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
        this.errorTemplate = errorTemplate;
        this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
    }

    @Override
    public void recover(Message message, Throwable cause) {
        if (!this.initialized) {
            initialize();
        }
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
        headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
        headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
        headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());

        String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
        if (this.admin.getQueueProperties(dlqName) == null) {
            bindDlq(dlqName);
        }
        this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Republishing failed message to " + dlqName);
        }
    }

    private void initialize() {
        this.admin.declareExchange(this.deadletterExchange);
        this.initialized = true;
    }

    private void bindDlq(String dlqName) {
        Queue dlq = new Queue(dlqName);
        this.admin.declareQueue(dlq);
        this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }

}

Question:

I want to do the following: when a message fails and falls to my dead letter queue, I want to wait 5 minutes and republishes the same message on my queue.

Today, using Spring Cloud Streams and RabbitMQ, I did the following code Based on this documentation:

@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

My MessageInputProcessor interface:

public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

And my properties file:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true


#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

With this code, I can read from dead letter queue, capture the header but I can't put it back to my queue (the line LOGGER.warn("Retrying message, {} attempts", retriesHeader); only runs once, even if I put a very slow time).

My guess is that the method bindOriginalToDelay is binding the exchange to a new queue, and not mine. However, I didn't find a way to get my queue to bind there instead of creating a new one. But I'm not even sure this is the error.

I've also tried to send to MessageInputProcessor.INPUT instead of MessageInputProcessor.INPUT_DESTINATION, but it didn't work as expected.

Also, unfortunately, I can't update Spring framework due to dependencies on the project...

Could you help me with putting back the failed message on my queue after some time? I really didn't want to put a thread.sleep there...


Answer:

With that configuration, myInput.group is bound to the delayed (topic) exchange myInput with routing key #.

You should probably remove spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true because you don't need the main exchange to be delayed.

It will also be bound to your explicit delayed exchange, with key myInput.group.

Everything looks correct to me; you should see the same (single) queue bound to two exchanges:

The myInput.group.dlq is bound to DLX with key myInput.group

You should set a longer TTL and examine the message in the DLQ to see if something stands out.

EDIT

I just copied your code with a 5 second delay and it worked fine for me (with turning off the delay on the main exchange).

Retrying message, 4 attempts

and

added to failed messages queue

Perhaps you thought it was not working because you have a delay on the main exchange too?