Hot questions for Using RabbitMQ in spring cloud stream

Top Java Programmings / RabbitMQ / spring cloud stream

Question:

I'am trying to send a simple message using "spring cloud stream" to the rabbitmq. Basically code looks like this:

@EnableBinding(Source.class)
@SpringBootApplication
public class SourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SourceApplication.class, args);
    }

    @Autowired Source source;

    @PostConstruct
    public void init() {
        source.send(MessageBuilder.withPayload("payload").build());
    }
}

then I get this error message:

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=******, headers={id=c60dd5be-6576-99d5-fd1b-b1cb94c191c1, timestamp=1488651422892}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)

However, if I add some delay, before sending a message (just second or few), it works ok. My question is: how can I wait before spring completely initialize message channels and then send a message?


Answer:

@PostConstruct is triggered too early (when the configuration bean is created, but before the context is started and the binding takes place). What you want is to trigger the sending of the message once the context is completely initialized, or at least after the output channels are bound.

You have a few options, all relying on the creation of an additional bean:

  1. To use the SmartLifecycle support from Spring (make sure that isAutoStartup returns true by default and the phase is zero - the default - so that the bean is started after outputs are bound).

  2. Use an ApplicationListener for ContextRefreshedEvent.

  3. Since this is a Spring Boot application you can use an ApplicationRunner bean (which gets invoked after the context has been created).

Question:

i'm trying to use the spring cloud stream for sending messages, but i'm not finding how do i send a message and get the return in the same method, same as I had using rabbitTemplate.

RabbitTemplate template = new RabbitTemplate(cf);
//configs template here...
Object test = template.convertSendAndReceive("Hello world");

//On Cloud Stream
private MessageChannel output;

public <T extends DomainEvent> void publish(T domainEvent){
    output.send(MessageBuilder.withPayload(domainEvent).build());
    //How to wait and receive the answer?
}

Answer:

Spring Cloud Stream is not designed for request/reply messaging.

Question:

I have created the following code and matching configuration. The ingest of messages works great, but the SpEL is not evaluated at all, thus a new Exchange with the expression as a name is created...

Looking at my dependencies versions the @SendTo annotation SHOULD support this kind of expression...

What am I doing wrong?

I'm using RabbitMQ and Spring Boot 1.4.3

@EnableBinding(CommandChannel.class)
public class CommandSink {

    private static final Logger LOGGER = LoggerFactory.getLogger(CommandSink.class);

    @StreamListener(CommandChannel.INPUT)
    @SendTo("!{request.messageProperties.headers['reply_to']}")
    public String processCommand(@Payload String cmd, @Header("reply_to") String replyToQueue){
        LOGGER.info("Got a {} command and I'll reply to {}", cmd, replyToQueue);
        String result = "geil: " + cmd;
        return result;
    }
}

Answer:

What am I doing wrong?

Nothing; there is simply no code in the bean post processor to evaluate a SpEL expression there; the value is a simple string value for the destination (channel name).

What leads you to believe SpEL is supported there? I am having trouble parsing this...

Looking at my dependencies versions the @SendTo annotation SHOULD support this kind of expression...

Question:

I have a rabbitmq queue and two spring cloud spring consumers. I want that each consumers process messages in order.

I thought that when consumer1 send ack, consumer2 receive second message, so I expected message1, message2 is processed in order in each consumers.

-------------------- time pass ------------------------>

consumer1:   message1             message3
consumer2:              message2            message4

But it wasn't. consumer1, consumer2 receive message1, message2, and process simultaneously.

-------------------- time pass ------------------------>

consumer1:   message1  message3
consumer2:   message2  message4

Is there a way for spring cloud stream to consume messages exclusively?


Answer:

RabbitMQ (AMQP) doesn't support that; each consumer gets prefetch messages.

It does support exclusive consumers, but it means consumer1 would get all the messages and consumer2 would only get messages if consumer1 dies.

However, Spring Cloud Stream doesn't currently provide a property to set that option.

Question:

I have a Spring Cloud Stream application that receives events from RabbitMQ using the Rabbit Binder. My application can be summarized as this:

@Transactional
@StreamListener(MySink.SINK_NAME)
public void processEvents(Flux<Event> events) {
       // Transform events and store them in MongoDB using 
       // spring-boot-data-mongodb-reactive
       ...
}

The problem is that it doesn't seem that @Transactional works with Spring Cloud Stream (or at least that's my impression) since if there's an exception when writing to MongoDB the event seems to have already been ack:ed to RabbitMQ and the operation is not retried.

Given that I want to achieve basically the same functionality as when using the @Transactional around a function with spring-amqp:

  1. Do I have to manually ACK the messages to RabbitMQ when using Spring Cloud Stream with the Rabbit Binder?
  2. If so, how can I achieve this?

Answer:

There are several issues here.

  1. Transactions are not required for acknowledging messages
  2. Reactor-based @StreamListener methods are invoked exactly once, just to set up the Flux so @Transactional on that method is meaningless - messages then flow through the flux so anything pertaining to individual messages has to be done within the context of the flux.
  3. Spring Transactions are bound to the thread - Reactor is non-blocking; the message will be acked at the first handoff.

Yes, you would need to use manual acks; presumably on the result of the mongodb store operation. You would probably need to use Flux<Message<Event>> so you would have access to the channel and delivery tag headers.

Question:

I've been looking at using RabbitMQ for cross-service messaging. I've been able to configure our Exchanges / Queues / DLX etc. using Spring annotations. Example (simple) queue listener:

@RabbitListener(queues = RabbitMessageType.QueueNames.SMS_NOTIFICATIONS)
public void receive1(Message message) throws Exception {
    RabbitMessageDto messageDto = OBJECT_MAPPER.readValue(message.getBody(), RabbitMessageDto.class);
    SmsNotificationDto payload = OBJECT_MAPPER.readValue(messageDto.getPayload(), SmsNotificationDto.class);
    log.info(payload.getMessage());
}

I'm using spring-cloud-sleuth to generate correlationIds / traceIds, which are preserved when using HTTP requests to talk to other services, enabling us to trace the given ID throughout the logs of our various microservices.

While I can get the current traceId and insert that into my DTO:

@Autowired
private Tracer tracer;

private RabbitMessageDto createRabbitMessageWithPayload(String messageType, 
                                                        String messageVersion, 
                                                        Object payload) {
    return new RabbitMessageDto.Builder()
        .withTraceId(tracer.getCurrentSpan().getTraceId())
        .withDtoName(messageType)
        .withDtoVersion(messageVersion)
        .withPayload(payload)
        .build();
}

I cannot find a way to set the traceId in the receiving method.

Googling keeps bringing me to spring-cloud-stream and spring-cloud-stream-starter-rabbit; documentation seems to indicate that it's possible automatically insert / set traceIds, but I'm not familiar with spring-cloud-stream at all, and don't find the documentation particularly helpful.

So, I would love answers to the following:

  • Using SpanAdjuster or Tracer etc; can I set the traceId based on the value in my DTO?
  • Using spring-cloud-stream, can I automagically insert / retrieve the traceId, and where would I start?

Answer:

So, incase someone comes across this looking to set the sleuth traceId context, we came up with the following solution:

@Autowired Tracer tracer;

private void someMethod(long traceId) {
    Span span = Span.builder()
        .traceId(traceId)
        .spanId(new Random().nextLong())
        .build();
    tracer.continueSpan(span);
    // do work
    tracer.closeSpan(span);
}

It should be noted that all the documentation says that a span should be closed once you've finished with it. The do work section above should be wrapped with a try / catch / finally block to ensure this is closed.

Any methods called with the span still open will inherit the traceId.

EDIT

I should also say, it seems the better solution would be to replace the Spring AMQP library with spring-cloud-stream; from what I can tell, this should automatically include the traceId in rabbit messages (correlationId) and set it at the other end. I haven't had the opportunity to test this, however.

Question:

I am using Spring cloud stream with RabbitMQ.

I want to be able to configure message and query properties from source code and not from a property file (as they mention in their docs).

For example with the classic Java Client for RabbitMq i can do something like that to create a queue with the properties i want:

                    //qName,    passive, durable, exclusive  auto-delete
channel.queueDeclare("myQueue", true,    false,   false,   , false       , null);

Any ideas on how can i achieve the same thing using Spring cloud stream?


Answer:

Inside of "application.yml" you can add all this values , following is example

spring:
  cloud:
    stream:
      instance-count: 1
      bindings:
        input:
          consumer:
            concurrency: 2
            maxAttempts: 1
          group: geode-sink
          destination: jdbc-event-result
          binder: rabbit
      rabbit:
        bindings:
          input:
            consumer:
              autoBindDlq: true
              republishToDlq: true
              requeueRejected: false

rabbitmq:
    username: ur-user-name
    password: ur-password
    host: rabbitmq-url-replace-here
    port: 5672
datasource:
  platform: mysql
  url: jdbc:mysql-url-replace-here
  username: ur-user-name
  password: ur-password
  driverClassName: com.mysql.jdbc.Driver

  datasource:
    tomcat:
      max-wait:  300
      min-idle: 10
      max-idle: 100

aggregator:
  groupCount: 2
  batchSize: 1000
  batchTimeout: 1000

Updated :

Question:

There are articles on how to test Spring cloud stream applications without connecting to a messaging system with spring-cloud-stream-test-support. But I want to really connect to RabbitMQ from my integration test, and cannot do that. Here is test class:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EnableBinding(Source.class)
public class StreamIT {

  @Autowired
  private Source source;

  @Test
  public void testMessageSending() throws InterruptedException {
    source.output().send(MessageBuilder.withPayload("test").build());
    System.out.println("Message sent.");
  }

}

Everything is the same as in @SpringBootApplication, they use the same properties from application.yml.

But there is no log line that message is sent (o.s.a.r.c.CachingConnectionFactory : Created new connection: SpringAMQP#22e79d25:0/SimpleConnection@5cce3ab6 [delegate=amqp://guest@127.0.1.1:5672/, localPort= 60934] ), and even if broker is not started, there is no java.net.ConnectException: Connection refused (Connection refused).

Am I doing something wrong? What is needed to create real connection to broker and send message from test?


Answer:

EDIT

You need to remove the test-support jar from the pom. It's presence (in test scope) is what triggers replacing the real binder with a test binder.

After removing the test binder support, this works fine for me...

@RunWith(SpringRunner.class)
@SpringBootTest
public class So49816044ApplicationTests {

    @Autowired
    private Source source;

    @Autowired
    private AmqpAdmin admin;

    @Autowired
    private RabbitTemplate template;

    @Test
    public void test() {
        // bind an autodelete queue to the destination exchange
        Queue queue = this.admin.declareQueue();
        this.admin.declareBinding(new Binding(queue.getName(), DestinationType.QUEUE, "mydest", "#", null));

        this.source.output().send(new GenericMessage<>("foo"));

        this.template.setReceiveTimeout(10_000);
        Message received = template.receive(queue.getName());
        assertThat(received.getBody()).isEqualTo("foo".getBytes());
    }

}

Although there is not a rabbit sample; there is a kafka sample that uses a real (embedded) kafka binder for testing, although the test jar is excluded, it doesn't explicitly say that's needed.

Question:

so I was reading this tutorial to configure RabbitMQ and SpringBoot.

At a certain point it is said:

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

So I started looking for more information on Spring docs it is written that:

When doing so, different instances of an application are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message.

Spring Cloud Stream models this behavior through the concept of a consumer group. (Spring Cloud Stream consumer groups are similar to and inspired by Kafka consumer groups.)

So I setup here two nodes with Spring Boot Cloud Stream and RabbitMQ and using spring.cloud.stream.bindings.<channel>.group.

This to me still looks like at-least-once behavior. Am I wrong in assuming that? Should I still manage the possibility to process a message twice even using spring.cloud.stream.bindings.<channel>.group?

Thank you


Answer:

It's at least once. The connection might close before the ack is sent. Rare, but possible.

Question:

i want to create a common project (using spring cloud stream) to route messages to different (consumer) projects dynamically according to message content. (rabbitmq as the message broker)

does spring cloud stream support it? if not, any proposed way to accomplish that? thx


Answer:

You can achieve that by setting spring.cloud.stream.dynamicDestinations property to a list of destination names (if you know the name beforehand) or keeping it as empty. The BinderAwareChannelResolver takes care of dynamically creating/binding the outbound channel for these dynamic destinations.

There is an out of the box router application available which does the similar thing.