Hot questions for Using RabbitMQ in json

Question:

I'm trying to create a simple spring boot app with spring boot that "produce" messages to a rabbitmq exchange/queue and another sample spring boot app that "consume" these messages. So I have two apps (or microservices if you wish). 1) "producer" microservice 2) "consumer" microservice

The "producer" has 2 domain objects. Foo and Bar which should be converted to json and send to rabbitmq. The "consumer" should receive and convert the json message into a domain Foo and Bar respectively. For some reason I can not make this simple task. There are not much examples about this. For the message converter I want to use org.springframework.messaging.converter.MappingJackson2MessageConverter

Here is what I have so far:

PRODUCER MICROSERVICE

package demo.producer;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    Queue queue() {
        return new Queue("queue", false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("queue");
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... args) throws Exception {
        sender.sendToRabbitmq(new Foo(), new Bar());
    }
}

@Service
class Sender {

    @Autowired
    private RabbitMessagingTemplate rabbitMessagingTemplate;
    @Autowired
    private MappingJackson2MessageConverter mappingJackson2MessageConverter;

    public void sendToRabbitmq(final Foo foo, final Bar bar) {

        this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);

        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo);
        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar);

    }
}

class Bar {
    public int age = 33;
}

class Foo {
    public String name = "gustavo";
}

CONSUMER MICROSERVICE

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Autowired
    private Receiver receiver;

    @Override
    public void run(String... args) throws Exception {

    }

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

And here is the exception I'm getting:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)]
Bean [demo.consumer.Receiver@1672fe87]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113)
    ... 12 common frames omitted

The exception says there is no converter, and that is true, my problem is that I have no idea how to set the MappingJackson2MessageConverter converter in the consumer side (please note that I want to use org.springframework.messaging.converter.MappingJackson2MessageConverter and not org.springframework.amqp.support.converter.JsonMessageConverter)

Any thoughts ?

Just in case, you can fork this sample project at: https://github.com/gustavoorsi/rabbitmq-consumer-receiver


Answer:

Ok, I finally got this working.

Spring uses a PayloadArgumentResolver to extract, convert and set the converted message to the method parameter annotated with @RabbitListener. Somehow we need to set the mappingJackson2MessageConverter into this object.

So, in the CONSUMER app, we need to implement RabbitListenerConfigurer. By overriding configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) we can set a custom DefaultMessageHandlerMethodFactory, to this factory we set the message converter, and the factory will create our PayloadArgumentResolver with the the correct convert.

Here is a snippet of the code, I've also updated the git project.

ConsumerApplication.java

package demo.consumer;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;

@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements RabbitListenerConfigurer {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        return converter;
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(jackson2Converter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    @Autowired
    private Receiver receiver;

}

@Service
class Receiver {
    @RabbitListener(queues = "queue")
    public void receiveMessage(Foo foo) {
        System.out.println("Received <" + foo.name + ">");
    }

    @RabbitListener(queues = "queue")
    public void receiveMessage(Bar bar) {
        System.out.println("Received <" + bar.age + ">");
    }
}

class Foo {
    public String name;
}

class Bar {
    public int age;
}

So, if you run the Producer microservice it will add 2 messages in the queue. One that represent a Foo object and another that represent a Bar object. By running the consumer microservice you will see that both are consumed by the respective method in the Receiver class.


Updated issue:

There is a conceptual problem about queuing from my side I think. What I wanted to achieved can not be possible by declaring 2 methods annotated with @RabbitListener that points to the same queue. The solution above was not working properly. If you send to rabbitmq, let say, 6 Foo messages and 3 Bar messages, they wont be received 6 times by the listener with Foo parameter. It seems that the listener are invoked in parallel so there is no way to discriminate which listener to invoke based on the method argument type. My solution (and I'm not sure if this is the best way, I'm open to suggestions here) is to create a queue for each entity. So now, I have queue.bar and queue.foo, and update @RabbitListener(queues = "queue.foo") Once again, I've updated the code and you can check it out in my git repository.

Question:

Recently I have been having a keen interest on Microservice Architecture using Spring Boot. My implementation has two Spring boot applications;

Application One receives requests from a RESTful API, converts and sends jSON payload to a RabbitMQ queueA.

Application Two, has subscribed to queueA, receives the jSON payload(Domain Object User) and is supposed to activate a service within Application Two eg. send email to a user.

Using no XML in my Application Two configuration, how do I configure a converter that will convert the jSON payload received from RabbitMQ into a Domain Object User.

Below are snippets from Spring Boot configurations on Application Two

Application.class

@SpringBootApplication
@EnableRabbit
public class ApplicationInitializer implements CommandLineRunner {

    final static String queueName = "user-registration";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AnnotationConfigApplicationContext context;

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("user-registrations");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    public static void main(String[] args) {
        SpringApplication.run(ApplicationInitializer.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Waiting for messages...");
    }

}

TestService.java

@Component
public class TestService {

    /**
     * This test verifies whether this consumer receives message off the user-registration queue
     */
    @RabbitListener(queues = "user-registration")
    public void testReceiveNewUserNotificationMessage(User user) {
        // do something like, convert payload to domain object user and send email to this user
    }

}

Answer:

I had the same problem and after some research and testing I learned, that there is more than one way to configure your RabbitMQ-Receiver in SpringBoot, but it is important to choose one and stick with that.

If you decide to go with the Annotation Driven Listener Endpoint, what I derive from your usage of @EnableRabbit and @RabbitListener, than the configuration you posted didnĀ“t work for me. What worked is the following:

Derive your Configuration Class from org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer and override the Method configureRabbitListeners as follows:

 @Override
public void configureRabbitListeners(
        RabbitListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}

and add a MessageHandlerFactory:

@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(new MappingJackson2MessageConverter());
    return factory;
}

Additionally you need to define SimpleRabbitListenerContainerFactory (as you already did) and Autowire the corresponding ConnectionFactory:

@Autowired
public ConnectionFactory connectionFactory;

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}

Finishing your Configuration, you need to define the Bean, which handles your Messages and inherits the @RabbitListerner-Annotations. For me I named that EventResultHandler (you named it TestService):

    @Bean
public EventResultHandler eventResultHandler() {
    return new EventResultHandler();
}

Then in your EventResultHandler (or TestService) you define the @RabbitListener-Methods with their corresponding Queues and the Payload (= the POJO, where your JSON-Message is serialized to):

@Component
public class EventResultHandler {

    @RabbitListener(queues=Queues.QUEUE_NAME_PRESENTATION_SERVICE)
    public void handleMessage(@Payload Event event) {
        System.out.println("Event received");
        System.out.println("EventType: " + event.getType().getText());
    }
}

I ommited the needed definition and binding of Queues and Exchanges - you can do that either in one or in another Microservice - or in RabbitMQ-Server manually... But you for sure have to do that.

Question:

I have put together a java test. It puts a message on a queue and returns it as a string. What Im trying to achieve is for it to it convert into the java object SignUpDto. I have stripped down the code as much as possible for the question.

The question:

How do I modify the test below to convert into a object?


SignUpClass

public class SignUpDto {
    private String customerName;
    private String isoCountryCode;
    ... etc
}

Application - Config class

@Configuration
public class Application  {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {

        // updated with @GaryRussels feedback
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }
}

The Test

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {Application.class})
public class TestQueue {

    @Test
    public void convertMessageIntoObject(){

        ApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        AmqpTemplate template = context.getBean(AmqpTemplate.class);

        String jsonString = "{ \"customerName\": \"TestName\", \"isoCountryCode\": \"UK\" }";

        template.convertAndSend("myqueue", jsonString);

        String foo = (String) template.receiveAndConvert("myqueue");

        // this works ok    
        System.out.println(foo);

        // How do I make this convert
        //SignUpDto objFoo = (SignUpDto) template.receiveAndConvert("myqueue");
        // objFoo.toString()  

    }
}

Answer:

Configure the RabbitTemplate with a Jackson2JsonMessageConverter.

Then use

template.convertAndSend("myqueue", myDto);

...

SignUpDto out = (SignUpDto) template.receiveAndConvert("myQueue");

Note that the outbound conversion sets up the content type (application/json) and headers with type information that tells the receiving converter what object type to create.

If you really do want to send a simple String of JSON, you need to set the content type to application/json. To help the inbound conversion, you can either set the type headers (look at the converter source for information), or you can configure the converter with a ClassMapper to determine the type.

EDIT

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
         message-converter="json" />

<bean id="json"
 class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

Or, since you are using Java Config; simply inject one into your template definition.

EDIT2

If you want to send a plain JSON string; you need to help the inbound converter via headers.

To set the headers...

template.convertAndSend("", "myQueue", jsonString, new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setContentType("application/json");
        message.getMessageProperties().getHeaders()
            .put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, "foo.SignUpDto");
        return message;
    }
});

Bear in mind, though, that this sending template must NOT have a JSON message converter (let it default to the SimpleMessageConverter). Otherwise, the JSON will be double-encoded.

Question:

I have a class that is being sent through RabbitMQ as a message, on the sender service, it is defined like:

public final class User implements Serializable {

    private String nome;
    private String cognome;

    public User(@JsonProperty("nome") String nome,
                @JsonProperty("cognome") String cognome) {
        this.nome = nome;
        this.cognome = cognome;
    }

    public String getNome() {
        return nome;
    }

    public String getCognome() {
        return cognome;
    }

    public User(){}
}

and on the receiver service:

@Document
public class Persona {

    @Id
    @JsonProperty
    public ObjectId id;

    private String nome;

    private String cognome;

    public String getId() {
        return id.toHexString();
    }

    public void setId(ObjectId id) {
        this.id = id;
    }

    public String getNome() {
        return nome;
    }

    public void setNome(String nome) {
        this.nome = nome;
    }

    public String getCognome() {
        return cognome;
    }

    public void setCognome(String cognome) {
        this.cognome = cognome;
    }

    public Persona(ObjectId id, String nome, String cognome) {
        this.id = id;
        this.nome = nome;
        this.cognome = cognome;
    }

    public Persona(){}
}

In receiver controller, I have the following method, which should take that message, cast it to an object, and save it in a database, it looks like:

@RabbitListener(queues = {"default_parser_q"})
    public void receiveMessage(final Message message){
        ObjectMapper mapper = new ObjectMapper()
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                .configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);

        mapper.readValue(message.getBody(), Persona.class);
        System.out.println(message.toString() + "the message has been received");
    }

The problem is that I'm getting an exception on the readValue() method, specifically:

Unhandled exceptions: java.io.IOException, com.fasterxml.jackson.core.JsonParseException, com.fasterxml.jackson.databind.JsonMappingException

The message(JSON) being sent in this case, is:

{
  'nome': "John",
  'cognome': "Doe"
}

what am I doing wrong here?

Edit: Adding the stack trace as requested.

com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of demo.com.fetcherservice.models.Persona (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{ "nome": "John", "cognome": "Doe" }') at [Source: (byte[])""{\n\t\t\t\"nome\": \"John\",\n\t\t\t\"cognome\": \"Doe\"\n\t\t}""; line: 1, column: 1] at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1343) at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1032) at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) at kdmforce.com.fetcherservice.services.FetcherService.receiveMessage(FetcherService.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) at java.lang.Thread.run(Thread.java:748) com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of demo.com.fetcherservice.models.Persona (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{ "nome": "John", "cognome": "Doe" }') at [Source: (byte[])""{\n\t\t\t\"nome\": \"John\",\n\t\t\t\"cognome\": \"Doe\"\n\t\t}""; line: 1, column: 1] at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1343) at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1032) at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) at kdmforce.com.fetcherservice.services.FetcherService.receiveMessage(FetcherService.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:196) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:129) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) at java.lang.Thread.run(Thread.java:748)


Answer:

It looks like the behaviour I've seen for Kafka, since you're also using Spring I assume it's the same.

Your sending end converts the object into Json and then sends it as a String - hence the escaped quotes in the error message (\"nome\": \"John\",\n\t\t\t\"cognome\": \"Doe\").

You need to declare JsonSerializer on the sender side and then pass it your User, or - if you manually create the Json String - declare that it's a byte array so Spring doesn't try to escape the quotes and whitespaces.

Question:

I want to pass JSON object to the RabbitMQ queue.

In the below code, I am using obj.toJSONString().getBytes() for converting Json object in to string, Is it possible to pass JSON object in the queue instead passing as string.

JSONObject obj = new JSONObject();
obj.put("Transaction","Test value");								
channel.basicPublish("", queueName, null, obj.toJSONString().getBytes()); 
System.out.println(" [x] Sent '" + obj.toJSONString() + "'");	

Answer:

We can only send the data as bytes to a rabbitmq queue. So we have to convert the json object into string. In your code snippet, you have done by using the code - obj.toJSONString().getBytes(). This is the correct approach.

Question:

My program sends a json string message to a queue, transforms the message to a json object, then calls the service activator. The service activator is taking the json object as an argument, and checking whether the "employer" property is in a hashtable, and then adds the property "count" to the json object: count is set to 1 if property is in the table and to -1 if it isn't.

Then, the updated json object is transformed back into a string and sent on to another queue.

However, the service-activator isn't working. I get this error (full stack-trace is under the attached code):

java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException:
 method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found

But the method clearly exists. Why is it asking me to put Throwable in my argument? I tried doing that and then got this error (full stack-trace is under attached code):

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#0' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#0': FactoryBean threw exception on object creation;
nested exception is java.lang.IllegalArgumentException: Target object of type [class SpringMessaging.MessageImprover] has no eligible methods for handling Messages.

I'm using a class called MessageCreator to create the original json string message, and then a class called MessageImprover to update it. Code:

package SpringMessaging;

public class MessageCreator {
public String messageJson;
public String employer;
public int sourceID;

 public void add(String name, int age) {

   messageJson = "{" +
            "\"employer\": \"" + name + "\"" +
            ", \"sourceID\": " + age + 
            '}';

}
 public String getJson(){
return messageJson;
}

}

Message Improver:

package SpringMessaging;

import java.util.HashMap;
import org.json.JSONObject;


public class MessageImprover {

   public JSONObject improve(JSONObject object) {
       int count;
       HashMap<String, Integer> table = new HashMap<String, Integer>();
            table.put("Icelantic", 8);
            table.put("Nordica", 9);
            table.put("Atomic", 10);
            table.put("Volkl", 11);
            table.put("Marker", 12);
        if(table.containsKey(object.getString("employer"))) {
           count = 1;
      } else {
            count = -1;
    }
        object.put("count", count);

     return object;
}

}

String to json object transformer:

import org.json.JSONObject;

public class stringtoJsonObject {

    public  JSONObject jsonFromString(String messageJson) {
        JSONObject object = new JSONObject(messageJson);
        return object;
    }
}

Config:

Send:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
   xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans.xsd    
                        http://www.springframework.org/schema/integration    
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/integration/amqp
                        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

   <rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
   <rabbit:admin connection-factory="connectionFactory" />
   <rabbit:queue name="first" auto-delete="false" durable="true" />
   <rabbit:queue name="second" auto-delete="false" durable="true" />

   <rabbit:fanout-exchange name="first-exchange" auto-delete="true" durable="true">
    <rabbit:bindings>
     <rabbit:binding queue="first" />
    </rabbit:bindings>
   </rabbit:fanout-exchange>

  <int:channel id="senderChannel">    
   </int:channel>

  <int-amqp:outbound-channel-adapter channel="senderChannel" exchange-name="first-exchange"  amqp-template="amqpTemplate" />

   <int-amqp:outbound-gateway id="outbound" request-channel="senderChannel" exchange-name="first-exchange" amqp-template="amqpTemplate" />

   <int:gateway id="Sender" service-interface="SpringMessaging.Sender" default-request-channel="senderChannel">
  <int:method name="sendMessage" />
   </int:gateway>

  <int:poller default="true" fixed-rate="100"/>

</beans>

Receiving from first queue, transforming, updating and sending to second queue:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
   xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xsi:schemaLocation="http://www.springframework.org/schema/beans    
                        http://www.springframework.org/schema/beans/spring-beans.xsd    
                        http://www.springframework.org/schema/integration    
                        http://www.springframework.org/schema/integration/spring-integration.xsd
                        http://www.springframework.org/schema/integration/amqp
                        http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
   <rabbit:admin connection-factory="connectionFactory" />
   <rabbit:queue name="first" auto-delete="false" durable="true" />
   <rabbit:queue name="second" auto-delete="false" durable="true" />

  <int:poller default="true" fixed-rate="100"/>

   <rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
     <rabbit:bindings>
       <rabbit:binding queue="second" />
     </rabbit:bindings>
   </rabbit:fanout-exchange>

    <int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange"  amqp-template="amqpTemplate" />

   <int-amqp:inbound-channel-adapter channel="messageInputChannel" queue-names="first" connection-factory="connectionFactory" />

   <int:channel id="messageInputChannel" />
   <int:channel id="jsonInputChannel"/>
   <int:channel id="jsonOutputChannel"/>
   <int:channel id="messageOutputChannel"/>

   <int:transformer input-channel="messageInputChannel" output-channel="jsonInputChannel" ref="messageToJsonObjectConverter" method="jsonFromString" />
   <bean id="messageToJsonObjectConverter" class="SpringMessaging.stringtoJsonObject" />

   <int:service-activator input-channel="jsonInputChannel" output-channel="jsonOutputChannel" ref="jsonObjectTransformer" method="improve" />
       <bean id="jsonObjectTransformer" class="SpringMessaging.MessageImprover" />

   <int:object-to-string-transformer input-channel="jsonOutputChannel" output-channel="messageOutputChannel" />

</beans>

Full stack-trace from first error:

java.lang.NoSuchMethodError: 
org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
 at java.lang.Thread.run(Thread.java:748)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer run
SEVERE: Stopping container from aborted consumer
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer doShutdown
INFO: Waiting for workers to finish.
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer doShutdown
INFO: Successfully waited for workers to finish.

Full stack-trace if I put Throwable V in the argument of "improve":

Exception in thread "main" 
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#0' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#0': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Target object of type [class SpringMessaging.MessageImprover] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:359)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:108)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyPropertyValues(AbstractAutowireCapableBeanFactory.java:1531)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1276)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:742)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
at SpringMessaging.Main.main(Main.java:15)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 
'org.springframework.integration.config.ServiceActivatorFactoryBean#0': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Target object of type [class SpringMessaging.MessageImprover] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:175)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1634)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:254)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
... 15 more

I don't understand why it's even asking me to put a Throwable as an argument in the "improve" method. Any ideas?


Answer:

java.lang.NoSuchMethodError: 
org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found

There should be some cause below that StackTrace. In other words you don't show the whole picture.

The second case is clear. You have signature like:

JSONObject improve(JSONObject object, Throwable e) 

Where Framework can't understand which method parameter treat for the payload in the future messages.

I'm pretty sure that NoSuchMethodError isn't related to the service-activator definition. There is something else downstream.

Question:

I have a spring boot project where I'm trying to integrate with a rabbitmq server so I can publish and read messages to/from a queue.

Here's my rabbitmq config (edited to only show relevant details):

@Configuration
@ConfigurationProperties(prefix="rabbit")
public class RabbitConfig {
    private String queue;

    @Bean
    Queue queue() {
        return new Queue(queue, durable);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue queue,
                                             MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(listenerAdapter);
        container.setMessageConverter(messageConverter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver);
    }

    public void setQueue(String queue) {
        this.queue = queue;
    }
}

Here is my Receiver classes:

public interface Receiver {
    void handleMessage(FooA message);
}

@Component
public class RabbitReceiver implements Receiver {
    @Override
    public void handleMessage(FooA message) {
        System.out.println(message);
    }
}

And my pojo:

public class FooA {}
    private double num;
    private Map<String, String> map = new HashMap();

    public FooA() {
    }

    public FooA(double num, Map<String, String> map) {
        this.num = num;
        this.map = map;
    }

    public int getnum() {
        return num;
    }

    public Map<String, String> getMap() {
        return map;
    }
}

I am successfully able to publish a FooA message object to the queue. Here's what it looks like in the queue:

[
    {
        "payload_bytes": 41,
        "redelivered": false,
        "exchange": "amq.fanout",
        "routing_key": "",
        "message_count": 0,
        "properties": {
            "priority": 0,
            "delivery_mode": 2,
            "headers": {
                "__TypeId__": "com.test.FooA"
            },
            "content_encoding": "UTF-8",
            "content_type": "application/json"
        },
        "payload": "{\"num\":1.2,\"map\":{}}",
        "payload_encoding": "string"
    }
]

But when I try to read from the queue I get this error:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'handleMessage' with argument type = [class [B], value = [{[B@75a7bfc9}]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97) [spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276) [spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219) [spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189) [spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97) [spring-rabbit-1.7.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421) [spring-rabbit-1.7.3.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.lang.NoSuchMethodException: com.test.RabbitReceiver.handleMessage([B)
    at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_121]
    at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.7.3.RELEASE.jar:na]
    ... 12 common frames omitted

What am I doing wrong?

Edit 1: I changed the method to:

@Override
public void handleMessage(byte[] message) {
    System.out.println(message);
}

Which worked but it's totally unusable. It just shows up like this:

How do I get this to map to my pojo FooA


Answer:

I just had the classes implement the Serializable interface removed the JsonMessageConverter. Json and Serializable was conflicting so it didn't work.

Question:

I have my springboot app which connects to a RabbitMQ queue where I push messages in specified Json format, for example: {"messageId":"de5fe5a3-1831-4b87-891d-e7a4c29295b4","message":{"listingId":"2"},"errors":[]} I did created a corresponding Java DTO obejct, but I don't know how to parse a rabbbitMq message straight into java object. I have found a way to do it with Jackson ObjectMapper however I would like spring to do it by himself instead of me

My DTO

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    final class ListingMessage {
        private final String messageId;
        private final Message message;
        private final List<String> errors;

        @Data
        static final class Message {
            private final String listingId;
        }
    }

And I would like to achieve somehting like that:

    @Component
    @Slf4j
    final class ListingMessageListener {

        private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

        @RabbitListener(queues = "${rabbitmq.queues}")
        public void receiveMessage(final ListingMessage message) {
                doSomeStuff(message); 
         }
       }

Instead of

@Component
@Slf4j
final class ListingMessageListener {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @RabbitListener(queues = "${rabbitmq.queues}")
    public void receiveMessage(final Message message) {

        final String json = new String(message.getBody());
        try {
            final ListingMessage listingMessage = OBJECT_MAPPER.readValue(json, ListingMessage.class);
            doSomeStuff(listingMessage);
        }catch(final Exception e){e.printStackTrace();}
     }
   }

EDIT. When I added Jackson2JsonMessageConverter

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

I got an exception

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.listingsdatapipeline.rabbitmq.ListingMessageListener.receiveMessage(com.listingsdatapipeline.rabbitmq.ListingMessage)]
Bean [com.listingsdatapipeline.rabbitmq.ListingMessageListener@626df4e5]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:191) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.listingsdatapipeline.rabbitmq.ListingMessage] for GenericMessage [payload=byte[167], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=changes-consumer-de, amqp_deliveryTag=1, amqp_consumerQueue=changes-consumer-de, content-type=application/json, amqp_redelivered=false, id=8fb1f2ad-96dd-3d25-c77d-82f741cfa788, amqp_consumerTag=amq.ctag-DfMMARR_KXMXyNJMoEXyAQ, timestamp=1539177836655}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-rabbit-2.0.6.RELEASE.jar:2.0.6.RELEASE]
    ... 10 common frames omitted

Answer:

Simply add a Jackson2JsonMessageConverter @Bean to your application configuration and Boot will auto-configure the listener container to use it; the @RabbitListener adapter will then attempt to convert to a ListingMessage since that's the method parameter type.

Question:

This is an extension on RabbitMQ for Java: how to send multiple float values?

Instead of 3 float parameters, I want to send 3 different classes as parameters and I want to use the JSon protocol.

The server is written in C#. Hence I decode the JSon string method of the server side.

Basically the solution offered on the other post in terms of floats is as follows:

final ByteBuffer buf = ByteBuffer.allocate(12)  // 3 floats
    .putFloat(f1).putFloat(f2).putFloat(f3);    // put them; .put*() return this
channel.basicPublish(buf.array());              // send

This will write the floats in big endian (default network order and what Java uses as well).

On the receiving side, you would do:

// delivery is a QueuingConsumer.Delivery

final ByteBuffer buf = ByteBuffer.wrap(delivery.getBody());
final float f1 = buf.getFloat();
final float f2 = buf.getFloat();
final float f3 = buf.getFloat();

but I want to send class Car, Airplane, Boat as JSon formatted from Java to C#


Answer:

I assume Car, Airplane and Boat are simple JavaBeans classes and their field members map the json contract.

You could use a json codec to serialize your objects into JSON. For example you can use Jackson or gson.

Classes:

class Car {
    private String model;
    public String getModel(){};
    public void setModel(String model){...};            
}
class Airplane {
    private String model;
    public String getModel(){};
    public void setModel(String model){...};            
}
class Boat {
    private String model;
    public String getModel(){};
    public void setModel(String model){...};            
}

The example will use Jackson and will output a structure like that: {"car":{"model":"xxx"},"boat":{"model":"xxx"}, ,"airplane":{"model":"xxx"}}

// Create the jsonFactory with an object mapper to serialize object to json
JsonFactory jsonFactory = new JsonFactory(new ObjectMapper());

// Create the byte array output stream
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

// Create the json generator 
JsonGenerator generator = jsonFactory.createGenerator(outputStream);

// Write the start object, ie. {}
generator.writeStartObject();
// Write the car "car":{}
generator.writeObjectField("car" , car);
// Write the car "boat":{}
generator.writeObjectField("boat" , boat);
// Write the car "airplane":{}
generator.writeObjectField("airplane" , airplane);
// Close the object
generator.writeEndObject();
// And the generator
generator.close();

// Convert the byte array stream to a byte array and publish the message
channel.basicPublish(outputStream.toByteArray());   

If you have a JavaBeans or a map which wraps theses 3 classes the code may be simplier:

ObjectMapper mapper = new ObjectMapper();
byte[] bur = mapper.writeValueAsBytes(wrapper);
channel.basicPublish(outputStream.toByteArray());   

Finally on the c# side you should create the same class and deserialize them.

You could use some json to java/c# generator, eg. jsonschema2pojo.