Hot questions for Using RabbitMQ in messagebroker

Top Java Programmings / RabbitMQ / messagebroker

Question:

I have the following rabbitMq consumer:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
     public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            sendNotificationIntoTopic(message);
            saveIntoDatabase(message);
     }
};

Following situation can occur:

  1. Message was send into topic successfully
  2. Connection to database was lost so database insert was failed.

As a result we have data inconsistency.

Expected result either both action were successfully executed or both were not executed at all.

Any solutions how can I achieve it?

P.S.

Currently I have following idea(please comment upon)

We can suppose that broker doesn't lose any messages.

We have to be subscribed on topic we want to send.

  1. Save entry into database and set field status with value 'pending'
  2. Attempt to send data to topic. If send was successfull - update field status with value 'success'
  3. We have to have a sheduled job which have to check rows with pending status. At the moment 2 cases are possible: 3.1 Notification wasn't send at all 3.2 Notification was send but save into database was failed(probability is very low but it is possible)

    So we have to distinquish that 2 cases somehow: we may store messages from topic in the collection and job can check if the message was accepted or not. So if job found a message which corresponds the database row we have to update status to "success". Otherwise we have to remove entry from database.

I think my idea has some weaknesses(for example if we have multinode application we have to store messages in hazelcast(or analogs) but it is additional point of hypothetical failure)


Answer:

Here is an example of Try Cancel Confirm pattern https://servicecomb.apache.org/docs/distributed_saga_3/ that should be capable of dealing with your problem. You should tolerate some chance of double submission of the data via the queue. Here is an example:

  1. Define abstraction Operation and Assign ID to the operation plus a timestamp.
  2. Write status Pending to the database (you can do this in the same step as 1)
  3. Write a listener that polls the database for all operations with status pending and older than "timeout"
  4. For each pending operation send the data via the queue with the assigned ID.
  5. The recipient side should be aware of the ID and if the ID has been processed nothing should happen.

6A. If you need to be 100% that the operation has completed you need a second queue where the recipient side will post a message ID - DONE. If such consistency is not necessary skip this step. Alternatively it can post ID -Failed reason for failure.

6B. The submitting side either waits for a message from 6A of completes the operation by writing status DONE to the database.

  • Once a sertine timeout has passed or certain retry limit has passed. You write status to operation FAIL.
  • You can potentialy send a message to the recipient side opertaion with ID rollback.

Notice that all this steps do not involve a technical transactions. You can do this with a non transactional database.

What I have written is a variation of the Try Cancel Confirm Pattern where each recipient of message should be aware of how to manage its own data.

Question:

I am currently learning RabbitMQ and Spring AMPQ. I was trying some examples this week but I am facing an issue in some step of the configuration.

I have this docker-compose file:

app:
  build: .
  environment:
    INSERTION_QUEUE: insertion.queue
    VALIDATION_QUEUE: validation.queue
    NUMBER_OF_VALIDATION_CONSUMERS: 1
    RESPONSE_EXCHANGE: response.exchange
    RESPONSE_ROUTING_KEY: response.routing.key
    RABBITMQ_HOST: rabbitmq
    RABBITMQ_PORT: 5672
    RABBITMQ_VHOST: /
    RABBITMQ_USERNAME: guest
    RABBITMQ_PASSWORD: guest
    JDBC_URL: jdbc:mysql://mysql:3306/hided
  links:
    - mysql:mysql
    - rabbitmq:rabbitmq
mysql:
  image: mysql:5.7
  environment:
    MYSQL_DATABASE: hided
    MYSQL_ROOT_PASSWORD: secret
rabbitmq:
  image: rabbitmq:3.6-management
  ports:
    - 15672:15672

And I copied this example, adding maven configuration to it: https://github.com/spring-guides/gs-messaging-rabbitmq/tree/master/complete

The RabbitMQ is accessed through the URL localhost:15672 correctly with username and password "guest"

But when I run this example, it gives me this error (which I believe the important message is Broker not available; cannot force queue declarations during start: java.net.ConnectException: Connection refused: connect):

2020-04-11 04:50:49.202  INFO 25576 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-04-11 04:50:51.222  INFO 25576 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Broker not available; cannot force queue declarations during start: java.net.ConnectException: Connection refused: connect
2020-04-11 04:50:51.225  INFO 25576 --- [    container-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-04-11 04:50:53.242 ERROR 25576 --- [    container-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:510) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:751) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2048) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1830) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1811) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1342) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1188) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_241]
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source) ~[na:1.8.0_241]
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) ~[na:1.8.0_241]
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source) ~[na:1.8.0_241]
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source) ~[na:1.8.0_241]
    at java.net.PlainSocketImpl.connect(Unknown Source) ~[na:1.8.0_241]
    at java.net.SocksSocketImpl.connect(Unknown Source) ~[na:1.8.0_241]
    at java.net.Socket.connect(Unknown Source) ~[na:1.8.0_241]
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.7.3.jar:5.7.3]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1113) ~[amqp-client-5.7.3.jar:5.7.3]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063) ~[amqp-client-5.7.3.jar:5.7.3]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:526) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:473) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    ... 12 common frames omitted

2020-04-11 04:50:53.244  INFO 25576 --- [    container-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-04-11 04:50:55.266  INFO 25576 --- [           main] c.a.A.AppNameApplication           : Started AppNameApplication in 7.328 seconds (JVM running for 8.175)
Sending message...
2020-04-11 04:50:55.268  INFO 25576 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-04-11 04:50:57.285  INFO 25576 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-04-11 04:50:57.293 ERROR 25576 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at com.appname.AppName.AppNameApplication.main(AppNameApplication.java:19) [classes/:na]
Caused by: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:510) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:751) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1009) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1075) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1068) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at com.appname.AppName.Runner.run(Runner.java:23) ~[classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    ... 5 common frames omitted
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_241]
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source) ~[na:1.8.0_241]
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) ~[na:1.8.0_241]
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source) ~[na:1.8.0_241]
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source) ~[na:1.8.0_241]
    at java.net.PlainSocketImpl.connect(Unknown Source) ~[na:1.8.0_241]
    at java.net.SocksSocketImpl.connect(Unknown Source) ~[na:1.8.0_241]
    at java.net.Socket.connect(Unknown Source) ~[na:1.8.0_241]
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.7.3.jar:5.7.3]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1113) ~[amqp-client-5.7.3.jar:5.7.3]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063) ~[amqp-client-5.7.3.jar:5.7.3]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:526) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:473) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    ... 14 common frames omitted

2020-04-11 04:50:57.296  INFO 25576 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2020-04-11 04:50:57.296  INFO 25576 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2020-04-11 04:50:57.298  INFO 25576 --- [           main] o.s.a.r.l.SimpleMessageListenerContainer : Shutdown ignored - container is not active already

I would like to understand what I am doing wrong, I tried to add the code below to "MessagingRabbitmqApplication.java" to force the connection, made some changes in ports, address, disabled firewall, no success:

@Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

I did try other examples but I keep facing this same error.

My Dockerfile is:

# Start with a base image containing Java runtime
FROM openjdk:8-jdk-alpine

# Add Maintainer Info
LABEL maintainer="hidden"

# Add a volume pointing to /tmp
VOLUME /tmp

ARG JAR_FILE=target/*.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]

And my application.properties is:

#Database properties
spring.datasource.url = jdbc:mysql://mysql:3306/app_name?user=root&password=secret
spring.datasource.username = root
spring.datasource.password = secret
spring.datasource.platform = mysql
spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver

#Rabbitmq properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.template.exchange=response.exchange
spring.rabbitmq.template.routing-key=response.routing.key
spring.rabbitmq.virtual-host=/

Thank you for your time, I appreciate any help.


Answer:

Please correct your bean creation configuration you should replace with

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(System.getProperty("RABBITMQ_HOST"));
    connectionFactory.setVirtualHost(System.getProperty("RABBITMQ_VHOST"));
    connectionFactory.setUsername(System.getProperty("RABBITMQ_USERNAME"));
    connectionFactory.setPassword(System.getProperty("RABBITMQ_PASSWORD"));
    return connectionFactory;
}

and modify your Dockerfile last line

# Start with a base image containing Java runtime
FROM openjdk:8-jdk-alpine
# Add Maintainer Info
LABEL maintainer="hidden"

# Add a volume pointing to /tmp
ARG JAR_FILE=target/*.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT java -DRABBITMQ_HOST=${RABBITMQ_HOST} -DRABBITMQ_VHOST=${RABBITMQ_VHOST} -DRABBITMQ_USERNAME=${RABBITMQ_USERNAME} -DRABBITMQ_PASSWORD=${RABBITMQ_PASSWORD} -Djava.security.egd=file:/dev/./urandom  -jar app.jar

Question:

I have several Java Clients sending messages publishing to direct exchange.

I want each of the clients to publish using their ID as a routing key (or as a header I can change to a headers exchange). If there's a queue bound using that rooting key it will publish to it else it will publish to a global queue.

So my question is, is it possible to have a queue that receives messages from the clients whom rooting key/header cannot be rooted to any of the queues ? Something which looks like a default queue.

Thanks.


Answer:

Found the solution using RabbitMQ Alternative exchange