Hot questions for Using RabbitMQ in mqtt

Question:

I'm using mosquitto MQ with eclipse paho java library. I would like to check if topic exists or not before i subscribe to a topic and get the message. How do i do that ?


Answer:

The short answer is you don't.

Topics are not something that really exist until the moment a message is published to one.

A subscriber tells the broker what topics it is interested in and should a publisher publish a message to one of those topics then the message will be forwarded to that subscriber (acls allowing).

Subscriptions can be made to topics that contain wild cards which can help if you want to match more than just a single topic. There are 2 wild card characters.

  • + which matches a single element in a topic. e.g. foo/+/bar will match foo/1/bar and foo/something/bar

  • # which matches multiple segments but only at the end of a topic. e.g. /foo/# will match /foo/1 and foo/bar/1/something. You can not place this anywhere but at the end so foo/#/bar will not work

The $SYS topics the @ΦXocę 웃 Пepeúpa ツ mentioned only supply stats about the broker not what topics exist.

Question:

Following this, I have this code

@Bean
open fun messageConverter(om: ObjectMapper): MessageConverter {
    return Jackson2JsonMessageConverter(om)
}

@Bean
open fun rabbitListenerContainerFactory(cf: ConnectionFactory, mc: MessageConverter ): SimpleRabbitListenerContainerFactory {
    val factory = SimpleRabbitListenerContainerFactory()
    factory.setConnectionFactory(cf)
    factory.setMessageConverter(mc)
    return factory
}

@Bean
open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
    return Jackson2ObjectMapperBuilderCustomizer {
        it.modules(JavaTimeModule(), KotlinModule())
        it.featuresToDisable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
    }
}

but I'm getting this exception

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public phg.entity.monitor.Foley phg.event.monitor.Provisioning.save(phg.entity.monitor.Foley)]
Bean [phg.event.monitor.Provisioning@2212e291]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:193) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:127) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [phg.entity.monitor.Foley] for GenericMessage [payload=byte[263], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=foley.new, amqp_receivedExchange=amq.topic, x-mqtt-publish-qos=1, x-mqtt-dup=false, amqp_deliveryTag=1, amqp_consumerQueue=foley.new, amqp_redelivered=false, id=c7fade5a-d803-166a-fe71-0a4f8abb8c4e, amqp_consumerTag=amq.ctag-9bkLtT_xhvQACuHqvx_izQ, timestamp=1551886588001}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    ... 12 common frames omitted

stepping through the code before I notice that it's using the GenericMessageConverter and that there isn't a Jackson2JsonMessageConverter


Answer:

According your stack trace there is nothing in the Listener Container to do with your incoming JSON. So, it happens as just pass through with byte[] as a pyalod. What you end up is just an attempt to convert that byte[] to the expected type on your @RabbitListener method. For that purpose, when you don't have a JSON information to convert on the listener container level, you need to configure a DefaultMessageHandlerMethodFactory with desired MappingJackson2MessageConverter. It can be achieve with the RabbitListenerConfigurer:

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }


    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

That MappingJackson2MessageConverter can be configured as a bean for your custom ObjectMapper injection.

Question:

Firstly, I will explain how I need to use MQTT. My backend is made up of several micro-services made with Spring-boot. The consumers of my apis are applications web and mobile, for the update of mobiles we used the onesignal but it is not able to meet us in certain features like and positioning in the map in "real time ", after a study we saw that the most used today is MQTT for mobiles, for that we intend to use RabbitMQ MQTT, since we already have several implementations using Rabbit. But here comes my great doubt, how do I get the producer (platform) to send the information to the correct subscribe (app), because all the examples I found, none made this ID.

I know that when the application connects to Rabbit it creates a Queue temporary, and when the MQTT service goes up it associates with a Rabbit Exchange. my problem is only to identify the final destination even.


Answer:

I would suggest you to look here How to publish a message to a specific client in Mosquitto MQTT.

The same is applied for RabbitMQ MQTT support.

So, or you need filtering on the client side, or consider to have MQTT topics for each subscriber.