Hot questions for Using RabbitMQ in rabbitmq exchange

Top Java Programmings / RabbitMQ / rabbitmq exchange

Question:

I'm trying to use headers exchange on RabbitMQ, with mixed java and python components, and I need confirmed delivery.

I seem to get different a behaviour from the python (pika) and java clients.

In python:

channel.exchange_declare(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦   ¦type='headers',
¦   ¦   ¦   ¦   ¦   ¦   ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦ routing_key='',
¦   ¦   ¦   ¦   ¦   ¦ mandatory=True,
¦   ¦   ¦   ¦   ¦   ¦ body=message,
¦   ¦   ¦   ¦   ¦   ¦ properties=pika.BasicProperties(
¦   ¦   ¦   ¦   ¦   ¦   ¦ delivery_mode=2,
¦   ¦   ¦   ¦   ¦   ¦   ¦ headers=message_headers))

If the headers don't match any bound consumer and the message cannot be routed, result is false

But in java/scala:

channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect

val props = MessageProperties.PERSISTENT_BASIC.builder
¦   ¦   ¦   ¦  .headers(messageHeaders).build
channel.basicPublish("headers_test", 
¦   ¦   ¦   ¦   ¦   ¦"", //routingKey
¦   ¦   ¦   ¦   ¦   ¦true, //mandatory
¦   ¦   ¦   ¦   ¦   ¦props, 
¦   ¦   ¦   ¦   ¦   ¦"data".getBytes)
channel.waitForConfirmsOrDie()

Here, when messageHeaders don't find a match, the message seems to just be dropped without an error.

Am I missing something or the behaviour of both clients really is different? And how can I get confirmed delivery using headers exchange in java?

Note: I already have a "complex" exchange to queues routing setup, I would rather avoid adding dead-letters routing to the game, and just fail-on-send.


Answer:

The problem that a message is considered confirmed even if there's no queue matching your headers. From the docs (https://www.rabbitmq.com/confirms.html):

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

Instead you should be checking for a basic.return message to detect if a message has been routed or not.

I've checked with wireshark, and indeed I can see that if a message is not routed there's an AMQP basic.return message.

I supppose you should start with

channel.addReturnListener(new ReturnListener() {
  @Override
  public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("App.handleReturn");
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
  }
});

And indeed if a message hasn't been routed I get this:

replyCode = [312], replyText = [NO_ROUTE], exchange = [headers_logs], routingKey = [], pro....

Furthermore, if you want to emulate Pika's synchronous behavior in Java it seems you can do it by taking a current publish sequence number before publishing a message and registering a confirmation listener instead of relying on .waitForConfirmsOrDie().

So a full code sample would be:

channel.addReturnListener(new ReturnListener() {
      @Override
      public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("App.handleReturn");
        System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
      }
    });

    channel.addConfirmListener(new ConfirmListener() {
      @Override
      public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("App.handleAck");
        System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
      }

      @Override
      public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("App.handleNack");
        System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
      }
});

long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);

channel.basicPublish("headers_logs",
    "",
     true,
     props,
    "data".getBytes());

And inside of the return/confirm callback you need to look for a channel's publish sequence number you got before publishing a message.

If you look at what happens on the wire, in case if a message hasn't been routed to any queue, RabbitMq sends back a single basic.return message which also contains a confirmation (delivery tag). If a message has been routed, RabbitMq sends back a single bacic.ack message which also contains a confirmation.

It seems that RabbitMq Java client always calls basicReturn() callback before a basicConfirm(), so a logic to determine if a message has been routed or not can be this:

Register return and confirm listeners on a channel; Memorize a channel's next publish sequence number; Wait for either a return or a confirm callback. If it's a return callback - a message has not been routed, and you should ignore a further confirmation for the same delivery tag. If you receive a handleAck() callback before you receive a handleReturn() it means a message has been routed to a queue.

Though I am not sure in which case can .handleNack() be called.

Question:

I want to route multiple messages between several RabbitMQ exchanges. This is the routing table that I want to use:

// | exchange | type | routing key | queue |
// |-----------------------------------------------------------------|
// | processing | topic | processing.event.transaction | processing.transaction.queue |
// | database | topic | database.event.transaction | database.transaction.queue |
// | database | topic | database.event.api_attempts | database.api_attempts.queue |
// | database | topic | database.event.event_logs | database.event_logs.queue |

I have 3 modules which I want to configure to send messages this way:

REST API Module -> Gateway module
REST API Module -> Database Module

REST API Module configuration

String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";   
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";  
String EXCHANGE_PROCESSING = "processing";
String EXCHANGE_DATABASE = "database";  
String ROUTING_KEY_PROCESSING = "processing.event.transaction";
String ROUTING_KEY_DATABASE = "database.event.transaction"; 
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";

channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_PROCESSING, BuiltinExchangeType.TOPIC);
channel.exchangeDeclare(EXCHANGE_DATABASE, BuiltinExchangeType.TOPIC);

channel.queueDeclare(QUEUE_PROCESSING_TRANSACTION, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_TRANSACTION, false, false, false, null);            
channel.queueDeclare(QUEUE_DATABASE_API_ATTEMPT, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_EVENT_LOGS, false, false, false, null);

channel.queueBind(QUEUE_PROCESSING_TRANSACTION, EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING);
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);

Sending Java objects to other modules:

TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));
channel.basicPublish(EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING, null, SerializationUtils.serialize(obj));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_DATABASE, null, SerializationUtils.serialize(obj));

ApiAttemptsBean obj = new ApiAttemptsBean();
obj.setId(Long.valueOf(2332));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS, null, SerializationUtils.serialize(obj));

EventLogsBean obj = new EventLogsBean();
obj.setId(Long.valueOf(111222));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS, null, SerializationUtils.serialize(obj));

Module Gateway configuration:

String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";   
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";  
String EXCHANGE_DATABASE = "database";  
String ROUTING_KEY_DATABASE = "database.event.transaction"; 
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";

channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_PROCESSING, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_PROCESSING_TRANSACTION, false, false, false, null);
channel.queueBind(QUEUE_PROCESSING_TRANSACTION, EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING);

Map<String, Consumer<byte[]>> queueToConsumer = new HashMap<>();
        queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction);
        queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt);
        queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs);

        queueToConsumer.forEach((queueName, consumer) -> {
            try {
                channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                            byte[] body) throws IOException {
                        consumer.accept(body);
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

private void process_transaction(byte[] object) {
        TransactionsBean obj = (TransactionsBean) SerializationUtils.deserialize(object);
        System.out.println("!!!! Received id " + obj.getId() + " in gateway");
    }

    private void process_api_attempt(byte[] object) {
        ApiAttemptsBean obj = (ApiAttemptsBean) SerializationUtils.deserialize(object);
        System.out.println("!!!! Received id " + obj.getId() + " in gateway");
    }

    private void process_event_logs(byte[] object) {
        EventLogsBean obj = (EventLogsBean) SerializationUtils.deserialize(object);
        System.out.println("!!!! Received id " + obj.getId() + " in gateway");
    }

Module Database:

String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";
String EXCHANGE_DATABASE = "database";  
String ROUTING_KEY_PROCESSING = "processing.event.transaction";
String ROUTING_KEY_DATABASE = "database.event.transaction"; 
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";

channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_DATABASE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_DATABASE_TRANSACTION, false, false, false, null);            
channel.queueDeclare(QUEUE_DATABASE_API_ATTEMPT, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_EVENT_LOGS, false, false, false, null);
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);

Map<String, Consumer<byte[]>> queueToConsumer = new HashMap<>();
queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction);
queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt);
queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs);

queueToConsumer.forEach((queueName, consumer) -> {
            try {
                channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                            byte[] body) throws IOException {
                        consumer.accept(body);
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

private void process_transaction(byte[] object) {       
    TransactionsBean obj = (TransactionsBean) SerializationUtils.deserialize(object);
    System.out.println("!!!! Received id " + obj.getId() + " in database");
}

private void process_api_attempt(byte[] object) {
    ApiAttemptsBean obj = (ApiAttemptsBean) SerializationUtils.deserialize(object);
    System.out.println("!!!! Received id " + obj.getId() + " in database");
}

private void process_event_logs(byte[] object) {
    EventLogsBean obj = (EventLogsBean) SerializationUtils.deserialize(object);
    System.out.println("!!!! Received id " + obj.getId() + " in database");
}

But messages are delivered not properly:

11:33:00,783 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-17-thread-6) Consumer org.database.context.ContextServer$1@6fee6ab4 (amq.ctag-arvcrYNc61cslclCTAnpDQ) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1): java.lang.ClassCastException: deployment.db.war//plugin.factories.TransactionsBean cannot be cast to deployment.database.war//org.plugin.factories.EventLogsBean

Looks like messages are not properly routed probably because my routing table is not correct.

Can you give me some guide how I can fix this issue?

EDIT: Error stack:

22:19:26,584 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-19-thread-6) Consumer org.database.context.ContextServer$1@49ee659f (amq.ctag-vjArBDtmtruIgeCMLipHGQ) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1): java.lang.ClassCastException: deployment.db.war//org.plugin.database.bean.TransactionsBean cannot be cast to deployment.db.war//org.plugin.database.bean.ApiAttemptsBean
    at deployment.db.war//org.database.context.ContextServer.process_api_attempt(ContextServer.java:79)
    at deployment.db.war//org.database.context.ContextServer$1.handleDelivery(ContextServer.java:64)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

22:19:26,586 INFO  [javax.enterprise.resource.webcontainer.jsf.config] (ServerService Thread Pool -- 111) Initializing Mojarra 2.2.13.SP5  for context '/rest_api'
22:19:26,619 INFO  [stdout] (pool-21-thread-6) !!!! Received id 2332 in gateway
22:19:26,667 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-19-thread-6) Consumer org.database.context.ContextServer$1@29ba98e4 (amq.ctag-RMVncG2xQn3KBJ561F9HNQ) method handleDelivery for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1): java.lang.ClassCastException: deployment.db.war//org.plugin.database.bean.TransactionsBean cannot be cast to deployment.db.war//org.plugin.database.bean.EventLogsBean
    at deployment.db.war//org.database.context.ContextServer.process_event_logs(ContextServer.java:84)
    at deployment.db.war//org.database.context.ContextServer$1.handleDelivery(ContextServer.java:64)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at deployment.db.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

22:19:26,669 INFO  [stdout] (pool-21-thread-6) !!!! Received id 111222 in gateway

Answer:

In the Module Database you set all queue bindings with the same routing_key: ROUTING_KEY_DATABASE

channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);

Probably should be:

channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);

It's just a copy-paste error (-:

Edit The code now seems good, so a few things to check:

-1- Did you clear the queues before running the new code (so there is no old messages)

-2- Try adding breakpoints or at least println in:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        consumer.accept(body);
                    }

Try to get some values from the envelope regarding the message routing key and original exchange.

EDIT 2

I checked it again, by eye everything looks good. The only thing I can give you is to better debug your code to understand what is the issue.

-1- Did you try to login to the rabbitMQ UI to monitor the messages?

-2- Try to add better logs, for instance, add the functino name in the println:

System.out.println("!!!! Received id " + obj.getId() + " in database");

changed with

System.out.println("process_transaction: Received id " + obj.getId() + " in database");

And do that inside all the function to know by the message where exactly you are.

-3- From the envelope, you can envelope.getExchange() but also envelope. getRoutingKey() to see exactly how the message was called.

Question:

I have created a new spring application which will push messages to a rabbitmq server. My rabbitMQConfig java file looks like this :

@Configuration
public class RabbitMQConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Value("${spring.rabbitmq.host}")
    private String SPRING_RABBITMQ_HOST;

    @Value("${spring.rabbitmq.port}")
    private int SPRING_RABBITMQ_PORT;

    @Value("${spring.rabbitmq.username}")
    private String SPRING_RABBITMQ_USERNAME;

    @Value("${spring.rabbitmq.password}")
    private String SPRING_RABBITMQ_PASSWORD;

    @Bean
    public RabbitTemplate rabbitTemplate(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(SPRING_RABBITMQ_HOST,SPRING_RABBITMQ_PORT);
        connectionFactory.setUsername(SPRING_RABBITMQ_USERNAME);
        connectionFactory.setPassword(SPRING_RABBITMQ_PASSWORD);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("my.controller.exchange");
        rabbitTemplate.setRoutingKey("my.controller.key");
        return rabbitTemplate;
    }

    @Bean
    DirectExchange exchange() {
            return new DirectExchange("my.controller.exchange", true, false);
    }

    @Bean
    public Queue queue() {
            return new Queue("my.controller", true);
    }

    @Bean
    Binding exchangeBinding(DirectExchange exchange, Queue queue) {
            return BindingBuilder.bind(queue).to(exchange).with("my.controller.key");
    }
}

Here is how I push message to the queue :

@Service
public class RabbitPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static Logger LOGGER = Logger.getLogger(RabbitPublisher.class);

    public  Boolean  pushToMyQueue(HashMap<String, Object> message) {
        try {
            rabbitTemplate.convertAndSend("my.controller.exchange","my.controller.key",message);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.error("Error in pushing to my queue", e);
        }
        return false;
    }
}

Since the exchange and queue are non-existent on the rabbitmq server, I expect them to be created automatically and message to be pushed. But it results in the following error :

ERROR 18198 --- [168.201.18:5672] o.s.a.r.c.CachingConnectionFactory       : 
Channel shutdown: channel error; protocol method: #method<channel.close>
(reply-code=404, reply-text=NOT_FOUND - no exchange 
'my.controller.exchange' in vhost '/', class-id=60, method-id=40)

When I create the exchange and queue and bind them manually on the server, a message gets pushed successfully. Please let me know if I am missing something. Thanks.


Answer:

You need to add a RabbitAdmin @Bean. The admin will declare the elements when a connection is first opened.

Question:

I am trying to build a sort of asynchronous server by using RabbitMQ along with JAVA. I have two exchanges Original_Exch and Dead_Exch, and one queue in each. Both the exchanges are declared DLX (dead letter exchange of each other's queue).

Now come to the problem, I am publishing a message to Original_Exch in the form of a json string which contains email Info ( such as To,Subject, Message body, attachment, etc ). After consuming this message from the queue bind to Original_exch, I am sending email to the specified email address. In case email is not sent successfully I am transferring this message to Dead_Exch and after 2 seconds ( using TTL for that ) the message is again being transferred to Original_Exch.

Let's assume a scenario in which a particular message is moving from one exchange to another due to continuous failure. In that case I want to make sure that if it has been transferred to Original_Exch 10 times, it should be dropped ( deleted ) from queue permanently and should not be transferred to Dead_Exch.

There are so many answers for almost similar kind of questions but none of them are satisfactory ( from a learner point of view ).

Thanks..........


Answer:

Messages which have been dead-letterred have a x-death header with details about which queue(s) it went through and how many times. See an article about dead-letter exchanges on RabbitMQ website.

So you can use this header to do what you want. I see two solutions:

  • In your consumer, when a mail could not be delivered, look at the x-death header and decide if you want to dead-letter it (Basic.Nack with requeue set to false) or drop it (Basic.Ack).
  • Use a header exchange type for Dead_Exch and configure the binding to match on x-death.

Because header exchanges only do exact match on the header value, the first solution is more flexible and less error-prone.

Question:

In my application I have 3 classes: - Company, which hires Workers for any of 3 jobs - Workers, each can do 2 jobs - Administrator, which receives copies of all messages in the program and can send messages to all companies, all workers or just everyone

I'm using work.companies.companyName for companies keys and work.workers.workerName for workers keys, they both use default exchange and queue for communication. The Administrator receives messages with admin Topic Exchange.

The problem is with the Administrator -> everyone else communication. It works exactly like Direct exchange - I can get Companies and Workers any names, even like "#", "company1.#" etc. and they won't receive anything, unless in the Administrator I explicitly send the message with key like "work.companies.company1". I would like to be able to use just e. g. "work.companies.#" to send message to all companies. What am I doing wrong? Administrator.java:

public class Administrator
{
    public static void main(String[] args) throws IOException, TimeoutException
    {
        new Thread(new TopicListener("admin", ign -> {})).start();
        TopicWriter writer = new TopicWriter();
    // lots of code

TopicListener.java:

public class TopicListener implements Runnable
{
    private final String EXCHANGE_NAME = "space";
    private String key;
    private Consumer<String> msgHandler;

    public TopicListener(String key, Consumer<String> msgHandler)
    {
        this.key = key;
        this.msgHandler = msgHandler;
    }

    @Override
    public void run()
    {
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, key);

            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                {
                    String msg = new String(body, StandardCharsets.UTF_8);
                    System.out.println("Received: \"" + msg + "\"");
                    msgHandler.accept(msg);
                }
            };

            channel.basicConsume(queueName, true, consumer);
        }
        catch (IOException | TimeoutException e)
        { e.printStackTrace(); }
    }
}

TopicWriter.java:

public class TopicWriter
{
    private final String EXCHANGE_NAME = "space";
    private final Channel channel;

    public TopicWriter() throws IOException, TimeoutException
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        this.channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    }

    public void send(String msg, String key) throws IOException
    {
        channel.basicPublish(
                EXCHANGE_NAME,
                key,
                null,
                msg.getBytes(StandardCharsets.UTF_8));
    }
}

Company.java contains:

new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();

Worker.java contains:

new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();

Answer:

I found out where the problem was: I was trying to send message to everyone using Topic, where in RabbitMQ Topic is used to specify who should receive the message. The "#" or "*" should be used in the queue key declaration, not while sending the message with a given key.

Question:

I currently have a defined rabbitmq exchange in my spring xml that is a fanout. I want to change it to a direct but when i deploy my new project the exchange is not updated. I see spring tries to create but fails.

Is there an override I am missing?


Answer:

It cannot be done; you can't change an exchange type after it is created.

You have to delete it and recreate.