Hot questions for Using RabbitMQ in spring cloud dataflow

Top Java Programmings / RabbitMQ / spring cloud dataflow

Question:

I'm just starting to learn Spring Cloud Streams and Dataflow and I want to know one of important use cases for me. I created example processor Multiplier which takes message and resends it 5 times to output.

@EnableBinding(Processor.class)
public class MultiplierProcessor {
    @Autowired
    private Source source;

    private int repeats = 5;

    @Transactional
    @StreamListener(Processor.INPUT)
    public void handle(String payload) {
        for (int i = 0; i < repeats; i++) {
            if(i == 4) {
                throw new RuntimeException("EXCEPTION");
            }
            source.output().send(new GenericMessage<>(payload));
        }
    }
}

What you can see is that before 5th sending this processor crashes. Why? Because it can (programs throw exceptions). In this case I wanted to practice fault prevention on Spring Cloud Stream.

What I would like to achieve is to have input message backed in DLQ and 4 messages that were send before to be reverted and not consumed by next operand (just like in normal JMS transaction). I tried already to define following properties in my processor project but without success.

spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.consumer.autoBindDlq=true

Could you tell me if it possible and also what am I doing wrong? I would be overwhelmingly thankful for some examples.


Answer:

You have several issues with your configuration:

  • missing .rabbit in the rabbit-specific properties)
  • you need a group name and durable subscription to use autoBindDlq
  • autoBindDlq doesn't apply on the output side

The consumer has to be transacted so that the producer sends are performed in the same transaction.

I just tested this with 1.0.2.RELEASE:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true

and it worked as expected.

EDIT

Actually, no, the published messages were not rolled back. Investigating...

EDIT2

OK; it does work, but you can't use republishToDlq - because when that is enabled, the binder publishes the failed message to the DLQ and the transaction is committed.

When that is false, the exception is thrown to the container, the transaction is rolled back, and RabbitMQ moves the failed message to the DLQ.

Note, however, that retry is enabled by default (3 attempts) so, if your processor succeeds during retry, you will get duplicates in your output.

For this to work as you want, you need to disable retry by setting the max attempts to 1 (and don't use republishToDlq).

EDIT3

OK, if you want more control over the publishing of the errors, this will work, when the fix for this JIRA is applied to Spring AMQP...

@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {

    public static void main(String[] args) {
        SpringApplication.run(So39018400Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    public interface Errors {

        @Output("errors")
        MessageChannel errorChannel();

    }

    private static class Foo {

        @Autowired
        Source source;

        @Autowired
        Errors errors;

        @StreamListener(Processor.INPUT)
        public void handle (Message<byte[]> in) {
            try {
                source.output().send(new GenericMessage<>("foo"));
                source.output().send(new GenericMessage<>("foo"));
                throw new RuntimeException("foo");
            }
            catch (RuntimeException e) {
                errors.errorChannel().send(MessageBuilder.fromMessage(in)
                        .setHeader("foo", "bar") // add whatever you want, stack trace etc.
                        .build());
                throw e;
            }
        }

    }

}

with properties:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false


spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1

Question:

I have a simple RabbitMQ source and sink. I'm publishing a message to the source queue with the following properties:

content_type -> application/json

and a JSON payload:

{
  "userId": 2,
  "customerId": 1,
}

The RabbitMQ sink gets the message with application/octet-stream instead of JSON.

I tried to start the application with the following properties:

spring.cloud.stream.default.contentType=application/json

but it didn't help.

Stream definition:

stream_1=rabbitSource: rabbit --queues=queue1 --password=p --host=h --username=u | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=h --username=u

How can I set the content type to be application/json? The reference guide doesn't seem to have the answer.

Release versions:

  • spring-cloud-dataflow-server:2.0.1.RELEASE
  • spring-cloud-skipper-server:2.0.0.RELEASE

Update:

As suggested in the answers by @SabbyAnandan, I'm now running:

dataflow:>stream create --name test123 --definition "rabbitSource: rabbit --queues=queue --password=p --host=rmq --username=u --spring.cloud.stream.bindings.output.contentType='application/json' | sink: rabbit --exchange=ex --routing-key=rk --converter-bean-name=jsonConverter --password=p --host=rmq --username=p"
Created new stream 'test123123'

dataflow:>stream deploy --name test123 --properties "app.rabbit.spring.cloud.stream.bindings.output.contentType='application/json'"
Deployment request has been sent for stream 'test123'

But the content_type is still the same.


Answer:

@Maroun, As discussed, here are some of the options.

  1. Rabbit sink receives data as byte[] always. You can force it to use a json converter by providing a configuration(--rabbit.converterBeanName=jsonConverter). But that is by default uses a Base64 encoder and the resultant text will be Base64 encoded. Then in the custom app where it receives data from the exchange where sink is publishing to, you need to provide a custom converter that property decodes using a Base64 decoder.
  2. The other option you have is to patch the Rabbit sink to fit your needs. Clone the Rabbit app starter repo and then provide a PassthroughConverter that simply passes the incoming byte[] downstream. You need to somehow change the content type on the Message header as well (to application/json). Then the normal Jackson converter will work on the custom application side as the byte[] is not Base64 encoded in this case.

Question:

I am new to Spring Cloud Data flow and I've been reading through the tutorials, trying to set up a project locally. (https://dataflow.spring.io/docs/installation/local/manual/)

Am I right to assume that a queuing system is a prerequisite for the servers to run?

How is this messaging middle-ware used by the data flow server and by the skipper server? Is there a way to use a db to store state instead of passing it from one app to the next using a queue?


Answer:

You can run it without messaging middleware. In that case the streams features are disabled, but you can still work with Spring Cloud tasks and Spring Batch jobs.

Essentially, in such a setup you only need the dataflow server and a database (i.e. MySQL).

To do so, just set the feature toggle spring.cloud.dataflow.features.streams-enabled to false. See also: https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#configuration-local

Hope that helps!