Hot questions for Using RabbitMQ in serialization

Top Java Programmings / RabbitMQ / serialization

Question:

I have an Employee class as below:

package com.mypackage.rabbitmq.model
import java.io.Serializable;

import javax.xml.bind.annotation.XmlRootElement;


@XmlRootElement
public class Employee implements Serializable{

    /**
    * 
    */
    private static final long serialVersionUID = -2736911235490297622L;
    private int EmpNo;
    private String FirstName;
    private String LastName;
    private int age;
    private String gender;
    private String skill;
    private long phone;
    private String email;
    private double salary;
    //getters and setters

I published the list of employee in rabbit MQ as below:

package com.mypackage.rabbitmq.client.publisher;

//imports

public class Publisher {

    public static void main(String[] args) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();

        Connection con = factory.newConnection("localhost");
        Channel channel = con.createChannel();

        Gson gson  = new GsonBuilder().create();
        Employee employee = null;

        List<Employee> empList = new ArrayList<>();
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);

        String queueName = "TestQueue";
        for(int i=1; i<=10; i++){
            employee = newEmp(i);

            String message =gson.toJson(employee);
            System.out.println("queueName: "+queueName);
            empList.add(employee);

        }
        oos.writeObject(empList);
        channel.basicPublish(1, "", queueName, null, bos.toByteArray());
        System.out.println("[X], sent '"+empList+"'");

        channel.close(0, queueName);
        con.close(0, queueName);
    }

    public static Employee newEmp(int i){

        //logic here
    }
}

From a separate spring boot application I tried to consume the list of employee. In the consumer application I have the same employee class structure. However the package is different.

package org.springboot.consumer.model;

import java.io.Serializable;

public class Employee implements Serializable{
    //fields and getters-setters here
}

In brief consumer code is as bellow:

public class SDPRabbitMQConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        Gson gson  = new GsonBuilder().create();
        try {
            ByteArrayOutputStream os = new ByteArrayOutputStream();
            os.write(message.getBody(), 0, message.getBody().length);
            ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
            ObjectInputStream objInputStream = new ObjectInputStream(is);
            System.out.println("objInputStream.readObject().toString()"+objInputStream.readObject().toString());
            Employee[] employeeArray = gson.fromJson(objInputStream.readObject().toString(), Employee[].class);
            List<Employee> employeeList = new ArrayList<Employee>(Arrays.asList(employeeArray));
            for(Employee employee: employeeList){
                System.out.println(employee);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

But I am getting the below exception:

java.lang.ClassNotFoundException: com.mypackage.rabbitmq.model.Employee

It seems it's a Serialization and Deserialization issue.

My questions are:

  1. If I publish Employee instead of List I don't even need to serialize the class. Then why I need to serialize in case of List?
  2. Why this Exception? Do we need to maintain same package structure for a serialized class in both end?
  3. In the consumer side do we need to have serialVersionUID? If yes, should it match with that of publisher's side?

Thanks in advance.


Answer:

1) while java.util.List is not Serializable, all standard implementations are, so you should be able to serialize/deserialize an ArrayList for instance.

2) Yes, you need the same class in the same package, and also..

3) Classes in both ends must have the same serialVersionUID

Question:

I want to implement a RabbitMq(AMQP) messaging in java SpringBoot, but when I receive the message it says that the messaGE could not be deserialize because I the class for the object that I supposed to receive was not found even if I have that class int the classpath.

RabbitMqListener.java:

@EnableRabbit
@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);

    @RabbitListener(queues = "queue2")
    public void processQueue1(Product message) {

        logger.info("Received from queue 2: " + message);
    }

}

RabbitConfiguration.java:

@Configuration
public class RabbitConfiguration {
    Logger logger = Logger.getLogger(RabbitConfiguration.class);
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }


    @Bean
    public Queue myQueue1() {

        return new Queue("queue1");
    }

    @Bean
    public Queue myQueue2() {

        return new Queue("queue2");
    }

}

SampleController.java:

@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    AmqpTemplate template;

    @RequestMapping("/emit")
    @ResponseBody
    String queue1() {
        logger.info("Emit to queue1");
        template.convertAndSend("queue1","Message to queue 1");
        return "Emit to queue 1";
    }
}

Product.java:

public class Product implements Serializable{

    private Long id;

    private String name;

    private int stock;

    private int price;



    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getStock() {
        return stock;
    }

    public void setStock(int stock) {
        this.stock = stock;
    }

    public Product() {
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", stock=" + stock +
                ", price=" + price +
                '}';
    }
}

And the stacktrace:

2016-07-11 09:34:02.840  WARN [order-service,,,] 4084 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:183) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1358) [spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:661) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1102) [spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) [spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) [spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) [spring-rabbit-1.5.6.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.IllegalStateException: Could not deserialize object type
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:82) ~[spring-amqp-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:110) ~[spring-amqp-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:185) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:173) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:118) ~[spring-amqp-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:102) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:88) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:757) ~[spring-rabbit-1.5.6.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.productservice.model.Product
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_91]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
    at org.springframework.util.ClassUtils.forName(ClassUtils.java:250) ~[spring-core-4.2.7.RELEASE.jar:4.2.7.RELEASE]
    at org.springframework.core.ConfigurableObjectInputStream.resolveClass(ConfigurableObjectInputStream.java:75) ~[spring-core-4.2.7.RELEASE.jar:4.2.7.RELEASE]
    at org.springframework.amqp.support.converter.SimpleMessageConverter$1.resolveClass(SimpleMessageConverter.java:179) ~[spring-amqp-1.5.6.RELEASE.jar:na]
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) ~[na:1.8.0_91]
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) ~[na:1.8.0_91]
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) ~[na:1.8.0_91]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
    at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:76) ~[spring-amqp-1.5.6.RELEASE.jar:na]
    ... 17 common frames omitted


Process finished with exit code -1

Answer:

In the code snippet you have registered jsonMessageConverter and at the same time you are making your Person.java serializable. You should either use serialization or the json approach. Moreover the problem lies in the package structure of Person.java. I encountered the same problem, resolved it by taking same package structure for the object being transferred in producer and consumer. The reason behind this is that while serialization, java takes class name , package structure and other information into account, so at the consumer side we need to have the same package structure.I encountered the same problem while using jsonMessageConverter. So from my viewpoint the solution to this problem is to package your message objects into a jar and add that jar as a dependency in your producer and consumer project.

Question:

I have a prototype Apache Beam pipeline where I try to read data from RabbitMQ using the following configuration

        p.apply("read_from_rabbit", RabbitMqIO.read()
                .withUri(options.getRabbitMQUri())
                .withQueue(options.getRabbitMQQueue())
                )
            .apply("extract_json_data", MapElements.via(new RabbitMessageToKafkaMessage()))

when I try to run it, I always get

Exception in thread "main" java.lang.NoClassDefFoundError: com/rabbitmq/client/QueueingConsumer$Delivery
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getDeclaredConstructors(Class.java:2020)
    at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1793)
    at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:72)
    at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:253)
    at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:251)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:250)
    at java.io.ObjectStreamClass.writeNonProxy(ObjectStreamClass.java:735)
    at java.io.ObjectOutputStream.writeClassDescriptor(ObjectOutputStream.java:668)
    at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1282)
    at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
    at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1213)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1120)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:119)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:250)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:35)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:205)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.translateAppliedPTransform(PTransformTranslation.java:369)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:120)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:651)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:666)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:269)
    at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:280)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:258)
    at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:208)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:154)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
    at myCompany.myProject.RabbitToKafka.runTransformer(RabbitToKafka.java:54)
    at myCompany.myProject.RabbitToKafka.main(RabbitToKafka.java:61)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.QueueingConsumer$Delivery
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 48 more

My understanding is that since my RabbitMessageToKafkaMessage class read RabbitMQ messages, these messages in turn contain RabbitMQ data, in particular delivery information injected into constructor : public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) { Is this ok ?

If so, how can I transform my RabbitMQMessage into a KV during the read operation ?

EDIT 1 error happens when running my pipeline from Eclipse.

EDIT 2 That project is a maven project run using Eclipse. My Apache Bean dependencies are all at the 2.12.0 version (which is the latest).

My dependency tree is as follows (at least the part regarding RabbitMQ)

myCompany:myProject:jar:0.1.5-SNAPSHOT
+- org.apache.beam:beam-sdks-java-io-rabbitmq:jar:2.12.0:compile
|  \- com.rabbitmq:amqp-client:jar:5.4.3:compile

Answer:

As it appear, merging my previous pom with an Apache Beam compatible one was not a good idea, as it created a conflict between two different versions of amqp-client. Removing the Spring boot parent pom solved the bug, by removing the bad amqp-client version.

Question:

I want to listen for messages coming into Rabbit queue with Spring's rabbit listener. My class looks very basic like this:

    @Component
public class MailMessageRabbitListener {
    private MailRepository mailRepository;
    private MailService mailService;

    @Autowired
    public MailMessageRabbitListener(MailRepository mailRepository, MailService mailService) {
        this.mailRepository = mailRepository;
        this.mailService = mailService;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @RabbitListener(queues = "msmail.queue")
    public void receiveMailMessage(JsonMailMessage jsonMailMessage) throws Exception {
        System.out.println(jsonMailMessage);
    }
}

My mapping class JsonMailMessage looks like this:

@Data
@NoArgsConstructor
@RequiredArgsConstructor
public class JsonMailMessage {
    @NonNull
    private String userToken;
    @NonNull
    private String sendTo;
    @NonNull
    private String subject;
    @NonNull
    private String content;
    @NonNull
    private String[] files;

}

Now I'm able to successfully listen to a queue, the problem is when I send message such as this within Rabbit queue:

Json array is filled with base64 encoded files, which should be basic String expressions, so the Jackson2JsonMessageConverter should be able to convert the files.

I'm however getting this kind of exception:

Bean [---------messaging.MailMessageRabbitListener@35ff072c]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:185) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:785) [spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:769) [spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1010) [spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [-----dto.JsonMailMessage] for GenericMessage [payload=byte[118], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=msmail.queue, amqp_deliveryTag=1, amqp_consumerQueue=msmail.queue, amqp_redelivered=false, id=b997e8cb-3424-df90-5370-aa4cf79175be, amqp_consumerTag=amq.ctag-lL9Y6R67fJc6DyCHAiWQRA, Content-Type=application/json, timestamp=1538554985476}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    ... 10 common frames omitted

Update

It's working, the message is getting converted to Java object. The problem starts when I want to return the object directly from Listener method.

 @RabbitListener(queues = "msmail.queue")
public JsonMailMessage receiveMailMessage(JsonMailMessage jsonMailMessage){
  return jsonMailMessage;
}

It throws different exception:

Caused by: org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.getReplyToAddress(AbstractAdaptableMessageListener.java:398) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:307) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    ... 10 common frames omitte

Answer:

The content_type is a well-known property not and arbitrary header; add it to Properties: instead.

Click the ? in the UI to see the well-known properties.

Since there is no content type, the converter simply returns the byte[] unchanged.