Hot questions for Using RabbitMQ in protocol buffers

Top Java Programmings / RabbitMQ / protocol buffers

Question:

What is the correct way to serialize a protobuf object in ruby and parse in Java? This is for automated testing, we are listening for this message on a Rabbit queue.

Publisher (Ruby):

protoNew = Protobuf::Request.new
protoNew.request = request
protoNew.id = id.to_i
protoNew.authentication = authentication

return protoNew.serialize_to_string

Consumer (Java):

    @Override
    public void onMessage(Message message, Channel channel) 
    {
        ProtoRequest protoRequest;
        try {
            protoRequest = ProtoRequest.parseFrom(message.getBody());
        } catch (InvalidProtocolBufferException e1) {
            logger.error("Error parsing protobuf", e1);
        } 

Here is the error I am seeing:

Error parsing protobuf: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag. at com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94) [protobuf-java-2.6.1.jar:] at com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:174) [protobuf-java-2.6.1.jar:] at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:139) [protobuf-java-2.6.1.jar:] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:168) [protobuf-java-2.6.1.jar:] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:180) [protobuf-java-2.6.1.jar:] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:185) [protobuf-java-2.6.1.jar:] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) [protobuf-java-2.6.1.jar:]


Answer:

It is not clear how you are packaging the protoNew.serialize_to_string output such that it becomes content of JMS message for Java Code. I guess something is getting messed due to char encoding.

As a work around, you can do Base 64 encoding of the binary output of protoNew.serialize_to_string, and do Base 64 decoding at Java end.

require "base64"
...
return Base64.encode64 protoNew.serialize_to_string

On Java Side,

import org.apache.commons.codec.binary.Base64;
...
protoRequest = ProtoRequest.parseFrom(Base64.decodeBase64(message.getBody()))

One thing bad about the above solution is that it will result in bigger payload size compared to pure binary form.

PS: Please note that if I write output of person.serialize_to_string to a file in Ruby side, and read that file on Java, it seems to work fine

f = File.open("data.dat", "wb")
f << person.serialize_to_string
f.close

Question:

I am currently using springAMQP to communicate between java and my RabbitMQ node. I am sending Protobuf data.

I would like to convert/cast/parse the received Message into the respective ProtoClass.

Here is the snippet from my Converter:

@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
    Preconditions.checkNotNull(object, "Object to send is null !");

    if (!com.google.protobuf.Message.class.isAssignableFrom(object.getClass())) {
        throw new MessageConversionException("Message wasn't a protobuf");
    } else {
        com.google.protobuf.Message protobuf = (com.google.protobuf.Message) object;
        byte[] byteArray = protobuf.toByteArray();

        messageProperties.setContentLength(byteArray.length);
        messageProperties.setContentType(ProtobufMessageConverter.CONTENT_TYPE_PROTOBUF);
        messageProperties.setHeader(ProtobufMessageConverter.MESSAGE_TYPE_NAME, protobuf.getDescriptorForType().getName());

        return new Message(byteArray, messageProperties);
    }
}

@Override
public Object fromMessage(Message message) throws MessageConversionException {

    com.google.protobuf.Message parsedMessage = null;
    try {
        if(ProtobufMessageConverter.CONTENT_TYPE_PROTOBUF.equals(message.getMessageProperties().getContentType())) {
            String typeName = getMessageTypeName(message);
            Descriptors.Descriptor messageType = fileDescriptor.findMessageTypeByName(typeName);
            parsedMessage = DynamicMessage.parseFrom(messageType, message.getBody());
        }
    } catch (Exception e) {
        throw new AmqpRejectAndDontRequeueException("Cannot convert, unknown message type %s".format(getMessageTypeName(message)));
    }
    return parsedMessage;
}

What do I have to do to be able to build the object?

Here is my proto file:

message queueReply {
    required string identifier = 1; cycle
    required uint32 keyId = 2;
    required bool success = 3; 
    required bytes result = 4; 
}

I would like to obtain the class queueReply from template.receiveAndConvert()


Answer:

Found the solution.

DynamicMessage o = (DynamicMessage)template.receiveAndConvert("queueName");
ProtoObject request = ProtoObject.parseFrom(o.toByteArray()); 

Question:

I have a question/problem.

I'm sending messages from C++ to Java (Play framework) using RabbitMq. So, in C++ side I used SerializeToString function (also tried SerializeToArray with char* ). ParseFrom doesn't work in Java using String or byte [].

Detail: in my message, I send base64 images, over 500k characters as String. The error is:

CodedInputStream encountered an embedded string or message which claimed to have negative size

Messages without base64 strings and other attributes, ParseFrom works ok.

Here the complete error:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[InvalidProtocolBufferException: CodedInputStream encountered an embedded string or message which claimed to have negative size.]]
    at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:323)
    at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:243)
    at play.core.server.AkkaHttpServer$$anonfun$1.applyOrElse(AkkaHttpServer.scala:382)
    at play.core.server.AkkaHttpServer$$anonfun$1.applyOrElse(AkkaHttpServer.scala:380)
    at scala.concurrent.Future.$anonfun$recoverWith$1(Future.scala:417)
    at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
Caused by: com.google.protobuf.InvalidProtocolBufferException: CodedInputStream encountered an embedded string or message which claimed to have negative size.
    at com.google.protobuf.InvalidProtocolBufferException.negativeSize(InvalidProtocolBufferException.java:92)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.pushLimit(CodedInputStream.java:1179)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:881)
    at model.RequestOrResponse$Response.dynamicMethod(RequestOrResponse.java:1542)
    at com.google.protobuf.GeneratedMessageLite.parsePartialFrom(GeneratedMessageLite.java:1597)
    at com.google.protobuf.GeneratedMessageLite.parsePartialFrom(GeneratedMessageLite.java:1630)
    at com.google.protobuf.GeneratedMessageLite.parseFrom(GeneratedMessageLite.java:1746)
    at model.RequestOrResponse$Response.parseFrom(RequestOrResponse.java:1232)
    at controllers.SubjectController.get(SubjectController.java:195)
    at router.Routes$$anonfun$routes$1.$anonfun$applyOrElse$14(Routes.scala:187)

Answer:

Try encode base64 on C++ side and decode in Java side, before ParseFrom. Read more here