Hot questions for Using RabbitMQ in connection

Question:

TL;DR How to create Spring Boot AMQP connection factory programatically?

Hey,

In order to connect to my RabbitMQ I added these to my application.properties file of my Spring Boot app:

spring.rabbitmq.host=host
spring.rabbitmq.port=5672
spring.rabbitmq.username=myapp
spring.rabbitmq.password=mypass

And according to my understanding, these values are then used to create Spring Boot's auto configured ConnectionFactory, which I then use in:

@Bean
@Conditional(RabbitCondition.class)
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter completedOrderListenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(completedOrderQueueName);
    container.setMessageListener(completedOrderListenerAdapter);
    return container;
}

I would like to be able to use rabbitMQ credentials from different environment files which are not application.properties, so I would like to create ConnectionFactory bean programatically. How do I achieve this?

Thanks.


Answer:

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(address);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    return connectionFactory;
}

Question:

I have hosted a multimodule project on heroku and receiving the following error when trying to run my Spring Boot Application.

2018-06-13T05:34:47.611296+00:00 app[web.1]: 2018-06-13 05:34:47.611  INFO 4 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'rabbitConnectionFactory' has been autodetected for JMX exposure
2018-06-13T05:34:47.617422+00:00 app[web.1]: 2018-06-13 05:34:47.617  INFO 4 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'rabbitConnectionFactory': registering with JMX server as MBean [org.springframework.amqp.rabbit.connection:name=rabbitConnectionFactory,type=CachingConnectionFactory]
2018-06-13T05:34:47.646470+00:00 app[web.1]: 2018-06-13 05:34:47.646  INFO 4 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-06-13T05:34:47.660578+00:00 app[web.1]: 2018-06-13 05:34:47.660  INFO 4 --- [    container-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:34:47.680639+00:00 app[web.1]: 2018-06-13 05:34:47.679 ERROR 4 --- [    container-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2018-06-13T05:34:47.680646+00:00 app[web.1]: 
2018-06-13T05:34:47.680648+00:00 app[web.1]: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:34:47.680650+00:00 app[web.1]:    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:34:47.680651+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:476) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:34:47.680656+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:614) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:34:47.680658+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:34:47.680659+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]

The issue I see is, it is trying to connect some localhost whereas I have my addon CloudAMQP Little Lemur added in heroku and I have added these to my both Application.yml file:

spring:
    profiles: heroku
    mvc:
      async:
        request-timeout: 3600000
    rabbitmq:
      addresses: amqp://****:*****@puma.rmq.cloudamqp.com/uyjgxslh
      host: puma.rmq.cloudamqp.com
      port: 1883
      username: ****:****
      password: *****

In my webModule I have this code:

@SpringBootApplication
public class SpringBootHerokuExampleApplication {

    public final static String PDF_MERGE_QUEUE= "pdf-merge-queue";

    @Bean
    Queue queue(){
        return new Queue(PDF_MERGE_QUEUE, false);
    }

    @Bean
    TopicExchange exchange(){
        return new TopicExchange("pdf-nerge-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with(PDF_MERGE_QUEUE);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(PDF_MERGE_QUEUE);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RabbitMQListener rabbitMQListener){
        return new MessageListenerAdapter(rabbitMQListener, "receiveMessage");
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringBootHerokuExampleApplication.class, args);
    }
}

and in my worker module main method:

public class SpringBootHerokuExampleApplication {
    public final static String PDF_MERGE_QUEUE= "pdf-merge-queue";
    public static void main(String[] args) {
        SpringApplication.run(SpringBootHerokuExampleApplication.class, args);
    }
}

Please find below some more stack trace from Heroku logs :

handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-13T05:54:13.871350+00:00 app[worker.1]: 2018-06-13 05:54:13.870  INFO 4 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2018-06-13T05:54:13.897860+00:00 app[worker.1]: 2018-06-13 05:54:13.897  INFO 4 --- [           main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/health],methods=[GET],produces=[application/vnd.spring-boot.actuator.v2+json || application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String, java.lang.String>)
2018-06-13T05:54:13.899632+00:00 app[worker.1]: 2018-06-13 05:54:13.899  INFO 4 --- [           main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/info],methods=[GET],produces=[application/vnd.spring-boot.actuator.v2+json || application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String, java.lang.String>)
2018-06-13T05:54:13.901591+00:00 app[worker.1]: 2018-06-13 05:54:13.901  INFO 4 --- [           main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator],methods=[GET],produces=[application/vnd.spring-boot.actuator.v2+json || application/json]}" onto protected java.util.Map<java.lang.String, java.util.Map<java.lang.String, org.springframework.boot.actuate.endpoint.web.Link>> org.springframework.boot.actuate.endpoint.web.servlet.WebMvcEndpointHandlerMapping.links(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-13T05:54:14.128560+00:00 app[worker.1]: 2018-06-13 05:54:14.128  INFO 4 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-06-13T05:54:14.145677+00:00 app[worker.1]: 2018-06-13 05:54:14.145  INFO 4 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'rabbitConnectionFactory' has been autodetected for JMX exposure
2018-06-13T05:54:14.154055+00:00 app[worker.1]: 2018-06-13 05:54:14.153  INFO 4 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'rabbitConnectionFactory': registering with JMX server as MBean [org.springframework.amqp.rabbit.connection:name=rabbitConnectionFactory,type=CachingConnectionFactory]
2018-06-13T05:54:14.199050+00:00 app[worker.1]: 2018-06-13 05:54:14.197  INFO 4 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-06-13T05:54:14.230846+00:00 app[worker.1]: 2018-06-13 05:54:14.230  INFO 4 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:54:14.425996+00:00 app[worker.1]: 2018-06-13 05:54:14.425  INFO 4 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2018-06-13T05:54:14.437981+00:00 app[worker.1]: 2018-06-13 05:54:14.435  INFO 4 --- [           main] o.e.SpringBootHerokuExampleApplication   : Started SpringBootHerokuExampleApplication in 13.198 seconds (JVM running for 14.523)
2018-06-13T05:54:14.969935+00:00 app[web.1]: 2018-06-13 05:54:14.969  WARN 4 --- [    container-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:14.970343+00:00 app[web.1]: 2018-06-13 05:54:14.970  INFO 4 --- [    container-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@362d89d0: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2018-06-13T05:54:14.971125+00:00 app[web.1]: 2018-06-13 05:54:14.971  INFO 4 --- [    container-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:54:14.971946+00:00 app[web.1]: 2018-06-13 05:54:14.971 ERROR 4 --- [    container-3] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2018-06-13T05:54:14.971948+00:00 app[web.1]: 
2018-06-13T05:54:14.971951+00:00 app[web.1]: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:14.971952+00:00 app[web.1]:    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971953+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:476) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971956+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:614) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971957+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971958+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971959+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971960+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971961+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971962+00:00 app[web.1]:    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971964+00:00 app[web.1]:    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971965+00:00 app[web.1]:    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171-heroku]
2018-06-13T05:54:14.971967+00:00 app[web.1]: Caused by: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:14.971969+00:00 app[web.1]:    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:14.971971+00:00 app[web.1]:    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:14.971972+00:00 app[web.1]:    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:14.971973+00:00 app[web.1]:    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:14.971974+00:00 app[web.1]:    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:14.971975+00:00 app[web.1]:    at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:14.971976+00:00 app[web.1]:    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:14.971977+00:00 app[web.1]:    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:955) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:14.971978+00:00 app[web.1]:    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:14.971979+00:00 app[web.1]:    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:14.971980+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:449) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:14.971981+00:00 app[web.1]:    ... 9 common frames omitted
2018-06-13T05:54:14.971982+00:00 app[web.1]: 
2018-06-13T05:54:14.972042+00:00 app[web.1]: 2018-06-13 05:54:14.971  INFO 4 --- [    container-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:54:19.262272+00:00 app[worker.1]: 2018-06-13 05:54:19.261  WARN 4 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:19.264406+00:00 app[worker.1]: 2018-06-13 05:54:19.264  INFO 4 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@793be5ca: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2018-06-13T05:54:19.272605+00:00 app[worker.1]: 2018-06-13 05:54:19.272  INFO 4 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:54:19.976366+00:00 app[web.1]: 2018-06-13 05:54:19.976  WARN 4 --- [    container-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:19.976491+00:00 app[web.1]: 2018-06-13 05:54:19.976  INFO 4 --- [    container-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@42d37ed7: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2018-06-13T05:54:19.977274+00:00 app[web.1]: 2018-06-13 05:54:19.977  INFO 4 --- [    container-4] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:54:19.978248+00:00 app[web.1]: 2018-06-13 05:54:19.978 ERROR 4 --- [    container-4] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2018-06-13T05:54:19.978251+00:00 app[web.1]: 
2018-06-13T05:54:19.978252+00:00 app[web.1]: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:19.978253+00:00 app[web.1]:    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:62) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978255+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:476) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978256+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:614) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978257+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978258+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1797) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978259+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978260+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1752) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978261+00:00 app[web.1]:    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:338) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978263+00:00 app[web.1]:    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978264+00:00 app[web.1]:    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978265+00:00 app[web.1]:    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171-heroku]
2018-06-13T05:54:19.978267+00:00 app[web.1]: Caused by: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:19.978268+00:00 app[web.1]:    at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:19.978269+00:00 app[web.1]:    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:19.978270+00:00 app[web.1]:    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:19.978271+00:00 app[web.1]:    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:19.978272+00:00 app[web.1]:    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:19.978273+00:00 app[web.1]:    at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_171-heroku]
2018-06-13T05:54:19.978274+00:00 app[web.1]:    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:19.978275+00:00 app[web.1]:    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:955) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:19.978276+00:00 app[web.1]:    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:907) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:19.978277+00:00 app[web.1]:    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:847) ~[amqp-client-5.1.2.jar!/:5.1.2]
2018-06-13T05:54:19.978278+00:00 app[web.1]:    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:449) ~[spring-rabbit-2.0.3.RELEASE.jar!/:2.0.3.RELEASE]
2018-06-13T05:54:19.978279+00:00 app[web.1]:    ... 9 common frames omitted
2018-06-13T05:54:19.978280+00:00 app[web.1]: 
2018-06-13T05:54:19.978370+00:00 app[web.1]: 2018-06-13 05:54:19.978  INFO 4 --- [    container-4] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2018-06-13T05:54:24.281046+00:00 app[worker.1]: 2018-06-13 05:54:24.280  WARN 4 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused (Connection refused)
2018-06-13T05:54:24.281735+00:00 app[worker.1]: 2018-06-13 05:54:24.281  INFO 4 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@7170b3b5: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2018-06-13T05:54:24.291703+00:00 app[worker.1]: 2018-06-13 05:54:24.291  INFO 4 --- [cTaskExecutor-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]

Why is it trying to connect to localhost when I have specified host name explicitly in yml file? Any Idea?


Answer:

Just found out that in yml file I have a profile named as heroku, when I removed it , it connected to the RabbitMQ in the given host.

Question:

I'm an experienced Java programmer and am trying out Spring Rabbit MQ for the first time.

I followed the messaging-rabbitMQ tutorial exactly using Maven. http://spring.io/guides/gs/messaging-rabbitmq/

I am running on CentOS as a user account.

When I ran the application at the very end of the tutorial with java -jar target/gs-messaging-rabbitmq-0.1.0.jar, I got the following Connection Refused error. Can someone help?

prompt> java -jar target/gs-messaging-rabbitmq-0.1.0.jar

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.2.3.RELEASE)

2015-05-13 11:49:44.833  INFO 26218 --- [           main] hello.Application                        : Starting Application v0.1.0 on minerva-02-33.gbcl.net with PID 26218 (/home/ccpm/test/spring_rabbitmq/target/gs-messaging-rabbitmq-0.1.0.jar started by ccpm in /home/ccpm/test/spring_rabbitmq)
2015-05-13 11:49:44.912  INFO 26218 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@8146158: startup date [Wed May 13 11:49:44 PDT 2015]; root of context hierarchy
2015-05-13 11:49:45.863  INFO 26218 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$9a886eed] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2015-05-13 11:49:46.397  INFO 26218 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2015-05-13 11:49:46.404  INFO 26218 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2015-05-13 11:49:46.433 ERROR 26218 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:54)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:207)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:441)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:947)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1065)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:615)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:199)
    ... 12 common frames omitted

Waiting five seconds...
2015-05-13 11:49:51.437  WARN 26218 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused
2015-05-13 11:49:51.437  INFO 26218 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0
2015-05-13 11:49:51.441 ERROR 26218 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:54)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:207)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:441)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:947)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1065)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:615)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:199)
    ... 12 common frames omitted

Answer:

You do not have RabbitMQ server installed. Luckily this is pretty easy to do. You mentioned you are using CentOS, here you can download RabbitMQ server: https://www.rabbitmq.com/install-rpm.html

There are a couple of steps you will need to follow, including installing Erlang.

The Spring example assumes you already know a bit about using a message broker. This is in itself a technology you will need to use, like when you had to learn to use a database or an operating system.

Question:

I am using the RabbitMQ java client. My app has multiple exchanges and queues. Adopting something similar to the Pub/Sub model.

What is the best practice regarding connections? Shall I have one connection per app?

I understand the channel model, and the thread (un)safety model. Just not sure if I should have multiple connections or not.


Answer:

One connection per app is correct.

Within that connection, you will have many channels - where the actual work is done.

You can have hundreds or thousands of message producers and consumers (each on their on channel) inside a single connection.

If you start to see slowdown in your RMQ setup because you're dong too much work, look at clustering RMQ and/or standing up multiple instances of your app.

But you would still maintain 1 connection per app instance.

Question:

I've made a simple test application for reading RabbitMQ queues using java amqp lib (implementation 'com.rabbitmq:amqp-client:5.7.1').

But im having trouble when connectiong to my rabbit server due to Android permissions (socket)

Here is the error message:

W/System.err: java.net.SocketException: socket failed: EPERM (Operation not permitted)

I've tried, successless, to add android.permission.INTERNET to the manifest. Here is what it looks like:

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="com.example.androidwebsocket">

    <uses-permission android:name="android.permission.INTERNET" />

    <application
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:roundIcon="@mipmap/ic_launcher_round"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">
        <activity android:name=".MainActivity">
            <intent-filter>
                <action android:name="android.intent.action.MAIN" />

                <category android:name="android.intent.category.LAUNCHER" />
            </intent-filter>
        </activity>
    </application>

</manifest>

What am I missing?

Edit

As requested, here is the full error stacktrace: https://pastebin.com/WAh2B4rP

And the code that triggers this error:

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("myuser");
factory.setPassword("mypass");
factory.setVirtualHost("/");
factory.setHost("myhost.io");
factory.setPort(5672);

connection = factory.newConnection(); //Error triggers here
channel = connection.createChannel();

Answer:

I was missing

<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

It should mentioned that connections MUST run in Background Threads, or otherwhise android will block it.

Also:

Uninstall app from emulator as suggested in java.net.SocketException: socket failed: EPERM (Operation not permitted)

Question:

I'm running 2 multi-threaded programs.

Each thread from the first program acts as a producer and writes messages to a queue, whereas each thread from the second program acts as a consumer and reads messages from the same queue.

In both projects I created a connection factory like the following:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setAutomaticRecoveryEnabled(true);
factory.setRequestedChannelMax(0);
factory.setUsername("user");
factory.setPassword("password");

However I'm not sure about the recommended approach for the next step.

  1. Should I create a new connection at the start of each thread like:

    Connection connection = factory.newConnection();
    

    And then for each request create a new channel like:

    Channel channel = connection.createChannel();
    
  2. Or should I create only a single connection, make the threads share the same connection, and then create a new channel for every request.

I know that the connection is a socket thread-safe connection and it should be created carefully. I'm just asking about whether there is a recommended approach to use in my program, because usually a documentation would contain a recommended way to handle connections and sockets, but I couldn't find such an answer in RabbitMQ's documentation.


Answer:

Use your Option 2. Rabbit MQ is fine with sharing a single Connection per process, so unless you are connecting to multiple brokers or have an extreme load profile, a single, long lived Connection shared across all threads in your process will suffice. Connections are relatively expensive (80% of the way down) to obtain, so short lived Connections will hamper producer performance.

Connections are thread safe - internally, the RabbitMQ client multiplexes multiple channels across a single Connection, allowing for concurrent use of the connection.

i.e. You can use the Connection as a "Channel Factory". However, Channels aren't thread safe, so you will typically create a short lived channel, produce a message, and close the channel on the producer (Channels are cheap to obtain).

At a suggestion, on the producer, instead of sending messages directly to a queue, instead, publish messages to an Exchange (i.e. Skip to the 'publish-subscribe' tutorial). This way you can make use of additional routing topologies, beyond the classical point-to-point mechanisms used by older middleware such as MSMQ / MQSeries.

Question:

We have abrupt connection close issue to rabbitmq on localhost dev/testing scenario. In our development environment we have rabbitmq installed on each developers Windows 7 machine, and we connect to it using java client via Spring AMQP library. Everything works fine for a while but at some point of time the connection is dropped with the following message in rabbitmq log:

=WARNING REPORT==== 4-Jan-2016::14:39:37 ===
closing AMQP connection <0.3731.0> (127.0.0.1:50792 -> 127.0.0.1:5672):
connection_closed_abruptly

In the client log we have this exception:

04/01/2016 14:39:37.181 (AMQP Connection 127.0.0.1:5672) ERROR [CachingConnectionFactory] Channel shutdown: connection error
04/01/2016 14:39:38.188 (SimpleAsyncTaskExecutor-2) WARN  [SimpleMessageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Unrecognized Windows Sockets error: 0: recv failed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
    ... 1 more

In the client start up we create a dynamic queue and attach several consumers to it. This connection drop causes the queue to be removed and any subsequent attempt to recreate it fails with the following exception:

04/01/2016 14:39:38.239 (SimpleAsyncTaskExecutor-8) WARN  [BlockingQueueConsumer] Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_TEST_QU]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1165)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:704)
    at com.sun.proxy.$Proxy77.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
    ... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    ... 1 more

We have had this problem with rabbitmq 3.4.x and 3.5.x and Spring AMQP 1.3.x and 1.5.x. The interesting thing is this never happens in our QA and production environment where rabbitmq is installed on a separate server.

Any help is much appreciated.


Answer:

The org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations() fails when there is no the queue to listen to anymore.

It isn't queue recreation it is just PassiveDeclarations. So, if you drop that "dynamic queue" manually, you should recreate it manually as well.

I mean if you don't follow with the RabbitAdmin bean contract, you don't have choice unless re-create it manually. You don't need to worry about the listener in that case: it will reconnect correctly, when the queue comes back.

UPDATE

Starting with version 1.5 Spring AMQP provides ListenerContainerConsumerFailedEvent, which is emitted exactly after all attemptPassiveDeclarations() attempts. So, you can catch that ApplicationEvent, stop() your listener container, declare queue back and start() the container again.

If your queue isn't a bean, it won't be redeclared automatically.

Question:

My spring boot application throws connection timeout error, and it is never able to connect. The other interesting problem I see is, it is never picking up the connection timeout property defined in spring app properties.

  org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:74) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:309) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:577) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1431) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1412) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1388) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:336) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1123) [spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$800(SimpleMessageListenerContainer.java:98) [spring-rabbit-1.6.7.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1329) [spring-rabbit-1.6.7.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.util.concurrent.TimeoutException: null
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:76) ~[amqp-client-3.6.5.jar:na]
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:110) ~[amqp-client-3.6.5.jar:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-3.6.5.jar:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-3.6.5.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:292) ~[amqp-client-3.6.5.jar:na]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:824) ~[amqp-client-3.6.5.jar:na]
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:736) ~[amqp-client-3.6.5.jar:na]
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:293) ~[spring-rabbit-1.6.7.RELEASE.jar:na]
    ... 9 common frames omitted

Here is my java config,

    @Configuration
@EnableRabbit
public class RabbitConfig {

    private final String exchange;
    private final String queueName;


    public RabbitConfig(
            @Value("${exchange.name}") String exchange,
            @Value("${queue.name}") String queue) {

        this.exchange= exchange;
        this.queueName=queue;
}

@Bean
Queue queue() {
    return new Queue(queueName, true);
}

@Bean
DirectExchange exchange() {
    return new DirectExchange(queueName);
}

@Bean
Binding binding(Queue queue, DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(queueName);
}

@Bean
SimpleMessageListenerContainer container(RabbitAdmin admin,CachingConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    connectionFactory.setCloseTimeout(10000);
    container.setQueueNames(queueName);
    container.setConnectionFactory(connectionFactory);
    //container.setMessageListener(listenerAdapter);
    return container;
}

//    @Bean
//    MessageListenerAdapter listenerAdapter(Receiver receiver) {
//        return new MessageListenerAdapter(receiver, "receiveMessage");
//    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }


}

And my spring application properties look like,

spring.rabbitmq.host = 127.0.0.1
spring.rabbitmq.port = 15672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

exchange.name=myExchange
queue.name=myQueue

spring.rabbitmq.cache.connection.mode=CONNECTION
spring.rabbitmq.cache.channel.size=50
spring.rabbitmq.cache.channel.checkout-timeout= 10000

Rabbit is up and running on 127.0.0.1 on port 15672, but the app is never able to connect.


Answer:

By default, the AMQP port is 5672. Port 15672 shows the web UI (admin console). If you're using the default setup, adjust

spring.rabbitmq.port = 5672

RabbitMQ networking configuration reference

Question:

New to Camel RabbitMQ. Wrote a simple RabbitMQ consumer with Apache Camel.

onException(StateException.class).log(LoggingLevel.WARN,"WarnMessage = Error on ID and Status ${body}.")
            .asyncDelayedRedelivery().redeliveryDelay(5000).maximumRedeliveries(1)
            .setHeader(RabbitMQConstants.REQUEUE, constant(true))
            .handled(true)
            .setFaultBody(constant(true));

from(STATE__Q_URL).delay(200).log(LoggingLevel.DEBUG,"Incoming Body is --> ${body}")
          /*  .wireTap("direct:logtofile")*/.to("invokeManager")
            .log(LoggingLevel.INFO, "Message = Completed for ${body}");

invokeManager currently makes a simple rest call after popping out a value from Queue. It works fine for around 100 message and starts throwing this error

java.net.SocketException: Connection reset

java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method) ~[na:1.8.0_144]
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) ~[na:1.8.0_144]
    at java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[na:1.8.0_144]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_144]
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[na:1.8.0_144]
    at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[na:1.8.0_144]
    at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:378) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.AMQChannel.quiescingRpc(AMQChannel.java:313) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:601) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534) ~[amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.StrictExceptionHandler.handleChannelKiller(StrictExceptionHandler.java:68) [amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.StrictExceptionHandler.handleConsumerException(StrictExceptionHandler.java:58) [amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:154) [amqp-client-4.2.1.jar!/:4.2.1]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [amqp-client-4.2.1.jar!/:4.2.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

2018-02-25 18:06:17.551  WARN 4155 --- [0.42.31.42:5672] c.r.c.impl.ForgivingExceptionHandler     : An unexpected connection driver error occured (Exception message: Connection reset)

Not sure what i am doing wrong, Any help is appreciated.


Answer:

@Robbo_UK yes i figured out the problem was with the buffer size of consumer. You need to set a few fields inorder to fix this issue

            +"&prefetchEnabled=true"
            +"&prefetchCount=100"
            +"&threadPoolSize="100"
            +"&channelPoolMaxSize="100"

I randomly copied the values from my code. This will solve your issues.

So this will restrict the number of messages pushed to the consumer and will prevent the buffer Overflow.

Question:

I'm trying to set up a RabbitMQ server (version 3.7.4, installed via homebrew on macOS 10.13.4) such that my colleagues can work with it via the internal network. We are using the official Java client (com.rabbitmq:amqp-client:5.2.0). Unfortunately, they only get a ConnectException with the message Connection refused:

java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:134)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:997)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:956)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:914)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1068)
    at my.own.private.RabbitMqConnector.connect(RabbitMqConnector.java:29)

We already set up another user account (not guest) which has admin privileges. With almost the same Java code I can connect locally (setting the host to localhost) but not if I use my IP address. According to the RabbitMQ docs, the RabbitMQ server binds to all network interfaces by default (otherwise I would have said that it's a problem with address binding). I disabled my firewall so that shouldn't be the problem.

Here's the respective code:

final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("username");
connectionFactory.setPassword("password");
connectionFactory.setHost("10.10.33.12");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

try (final Connection connection = connectionFactory.newConnection()) {
    // doo stuff with the connection
}

Answer:

I finally found the actual problem and it's not related to Java or how I configured the ConnectionFactory. It was indeed a problem how the RabbitMQ package for homebrew is configured. In the rabbitmq-env.conf file, there was the following setting

NODE_IP_ADDRESS=127.0.0.1

This setting caused RabbitMQ to only listen on 127.0.0.1 for incoming connections. Once I removed the setting and restarted the service, I could connect from other machines as well.

Thank you all for your help!

Question:

I'm using Camel to consume and produce messages in RabbitMQ. Also, I'm working with Spring boot so I have created a ConnectionFactory bean with all the configuration I want.

That works great but I have to declare the name of the bean in every Endpoint string I create.

Is there a way to setup camel to use this specific bean by default?


Answer:

According to these source lines I don't think it is achievable.

Question:

I'm using Java RabbitMQ Client. I publish a message (basicPublish) and then I close channel. In Consumer, channel.basicAck throw exception:

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.util.concurrent.RejectedExecutionException: Task com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable@665de7c7 rejected from java.util.concurrent.ThreadPoolExecutor@35f53993[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 5]

If delete channel.close(), the error doesn't reproduced. Why connection closed, when I close the channel?

Sent messages to exchange:

Channel channel = connection.createChannel();
Set<String> expectedMessages = new HashSet<>(MESSAGES_COUNT);
for (int i = 0; i < MESSAGES_COUNT; i++) {
    String message = Integer.toString(i);
    channel.basicPublish(
        TEST_EXCHANGE,
        ROUTE_KEY,
        TEXT_PLAIN,
        message.getBytes("UTF-8")
    );
    expectedMessages.add(message);
}
channel.close();

Consumer:

try {
    channel.basicAck(deliveryTag, false);
} catch (Exception e) {
    log.error("Error during message handling: " + consumerTag, e);
    channel.basicNack(deliveryTag, false, true);
}

Answer:

The problem was in the wrong connectionFactory settings. Incorrect:

    executorService = new ThreadPoolExecutor(
            properties.minThreads, properties.maxThreads,
            properties.maxThreadIdle, TimeUnit.SECONDS,
            new SynchronousQueue<>()
    );
    connectionFactory.setSharedExecutor(executorService);

For correct work:

    executorService = new ThreadPoolExecutor(
            properties.minThreads, properties.maxThreads,
            properties.maxThreadIdle, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()
    );
    connectionFactory.setSharedExecutor(executorService);

Question:

I have rabbitMQ server running on vm. I am following rabbitMQ java tutorial. It works fine locally on the vm but when trying to send from the host I get an exception

Exception in thread "main" java.net.ConnectException: Connection timed out: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
at java.net.Socket.connect(Unknown Source)
at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:714)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:760)
at Send.main(Send.java:16)

here is the send code i am using:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;

public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws java.io.IOException, TimeoutException {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.198.100");
            factory.setPort(5672);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World from Windows!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            channel.close();
            connection.close();

        }
    }

I can ping the server at 192.168.198.100 but I can't access the managment UI at 192.168.198.100:15672/

So could anyone help me figure out what's wrong with this issue? Thanks in advance.


Answer:

1.

You are using guest guest as credentials, and it is not allowed for remote IP.

Please read this: Can't access RabbitMQ web management interface after fresh install then you have to add this:

factory.setPassword("test");
factory.setUsername("test");

2.

Did you enable the management UI? if not use:

rabbitmq-plugins enable rabbitmq_management

3.

check your firewall configuration maybe the ports 5672 and 15672 are closed. You can use telnet to test the ports:

telnet 192.168.198.100 5672
Trying 192.168.198.100...
Connected to 192.168.198.100.
Escape character is '^]'.

and:

telnet 192.168.198.100 15672
Trying 192.168.198.100...
Connected to 192.168.198.100.
Escape character is '^]'.

Question:

I need to receive messages of a queue, but this queue is inside in another machine(AWS instance) with https(https://www.mymachine.com/rabbitmq) but when I want to establish a connection to the queue I get a NullPointerException.

This is a part of code:

factory.setHost(https://www.mymachine.com/rabbitmq);
Connection connection = factory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

Answer:

RabbitMQ, by default, does not use HTTP protocol, it uses AMQP protocol.

you have to change the factory.setHost with the ip or hostname.

factory.setHost(yourmachine)

if you need an SSL connection please read: https://www.rabbitmq.com/ssl.html it is very clear tutorial.

Question:

I'm attempting to connect my Coldfusion app to CloudAMQP's RabbitMQ service. I've been able to create the java object, but when I attempt to create a newConnection(), it fails miserably. I'm thinking it may have something to do with my config? Here's how I've mapped AMQP's settings (right) to my code (left). I'm basically following Luis Majano's example code on github (lmajano/messaging-polyglot) which he refers to in his video Down the RabbitMQ Hole with ColdFusion

NOTE: I will rotate the password after posting, so these credentials won't work. Seems like the prudent thing to do :)

When I run this code I'm able to create a factory successfully. The writeDump(factory) code outputs the following.

NOTE: the newConnection() method

Now, when I attempt to actually create a connection factory.newConnection() like so...

it fails! Here is the result of the dump within the catch writeDump(err)

Any idea why it would be failing on the factory.newConnection() method call?


Answer:

Set the vhost:

factory.setVirtualHost("vhost");

The vhost is the same as the username for shared cloudamqp instances.

Question:

I am new to RabbitMq.This is my first Programme .But it is giving some error. My code is like

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
public class Send {

    private final static String QUEUE_NAME = "hello";



    public static void main(String[] argv)
            throws java.io.IOException, TimeoutException {

        ConnectionFactory factory =  new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection  =  factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null) ;
        String message = null;
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();


    }

}

and my receiver class is like

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Receive {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO Auto-generated method stub
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                throws IOException {
              String message = new String(body, "UTF-8");
              System.out.println(" [x] Received '" + message + "'");
            }
          };
          channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

Now when I run the class it gives an error like

Exception in thread "main" java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:676)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:722)
    at Send.main(Send.java:19)

I am not getting this error. I have installed the RabbitMq server also. I dont Know how to create the instance of the server . Can anyone help me to solve this error ? do i need to do any changes in eclipse configurations ?


Answer:

i think you need to specify port address and also please check you can not connect with rabbitMQ with normal http request so please check that as well

Question:

I am trying to setAutomaticRecoveryEnabled for my connection factory. However, I get a compile error unable to recognize the method setAutomaticRecoveryEnabled for factory object.

import com.rabbitmq.client.ConnectionFactory;

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setAutomaticRecoveryEnabled(true); // unable to recognize the
//method setAutomaticRecoveryEnabled. 

pom dependency

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>2.8.4</version>
</dependency>

Thanks for taking the time to read this and give some advice.


Answer:

I changed the dependency version to 3.5.1:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>

This worked.

Question:

I am creating a spring boot websocket application using STOMP and rabbitmq. I have installed erlang server and rabbitmq in windows. Started rabbitmq-plugins.bat. But I am getting the below error upon trying to start the application. I have not been able to login to rabbitMq yet. Please let me know if there is any other info I can provide.

2020-03-11 17:21:21.076  INFO 4528 --- [           main] c.h.e.a.EnterprisedashboardApplication   : Started EnterprisedashboardApplication in 28.562 seconds (JVM running for 30.314)
2020-03-11 17:21:23.058  INFO 4528 --- [ient-loop-nio-1] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session _system_: Failed to connect: Connection refused: no further information: /127.0.0.1:61613

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:61613
Caused by: java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_181]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_181]
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_181]

build.gradle

plugins {
    id 'org.springframework.boot' version '2.2.5.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}
group = 'com.hyb.enterprisedashboard'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
            mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-security'
    implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    compile ('com.fasterxml.jackson.core:jackson-databind:2.9.8')
    compile group: 'org.springframework.boot', name: 'spring-boot-starter-reactor-netty'

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.security:spring-security-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'

}

test {
    useJUnitPlatform()
}

WebSocketBrokerConfig.java

package com.heb.enterprisedashboard.api.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketBrokerConfig.class);

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {

        config.setApplicationDestinationPrefixes("/app")
          .enableStompBrokerRelay("/topic","/queue")
          .setRelayHost("127.0.0.1")
          .setRelayPort(61613)
          .setClientLogin("guest")
          .setClientPasscode("guest");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/dashboardsocket")
            .withSockJS();
    }
}

==== UPDATE=== I had installed erlang server as a user. Will be reinstalling using an admin account. Need interuption from office IS support since I am working on office computer. Will update if that helps.


Answer:

Installed Erlang server in Windows as Administrator and that resolved the issue.

Question:

I'm a little confused about the automatic recovery of the connections in RabbitMQ java client.

Consider the following code

ConnectionFactory factory;
factory.setAutomaticRecoveryEnabled(true);
Connection connection = factory.newConnection();
Channel chan_obj = conn.createChannel();

As per RabbitMQ documentation, if connection drops, then automatically recovery will re-create the connection, re-create the channel etc. I'm really confused about what to do with the 'chan_obj' while RabbitMQ automatically recovers. I'm curious if the 'channel' created from the connection after the recovery will be usable with the object 'chan_obj' Another question is during the recovery process, will I get any notification that channel has closed and hopefully followed by another notification like 'channel can be used again'.


Answer:

To cite the documentation

Create a new channel, using an internally allocated
channel number. If automatic connection recovery is
enabled, the channel returned by this method will
be Recoverable.

See also AutorecoveringConnection and AutorecoveringChannel.

Question:

I am using rabbitmq spring framework. There is an issue with my queues, during my rabbitmq consumer deployment, the suddenly disconnect will left unacked messages behind.

<rabbit:listener-container id="MyListenerContainer"
    connection-factory="MyRabbitConsumerConnectionFactory"
    prefetch="100"
    concurrency="5"
    acknowledge="manual"
    auto-startup="true">
    <rabbit:listener queues="MyRabbitQueue" ref="MyConsumer"/>

<rabbit:queue id="MyRabbitQueue"
              name="MyRabbitQueue"
              declared-by="MyConsumerRabbitAdmin"
              auto-delete="false"
              durable="true"
              exclusive="false"/>

<rabbit:admin id="MyConsumerRabbitAdmin"
              connection-factory="MyRabbitConsumerConnectionFactory"
              auto-startup="true"/>

MyConsumer implemented ChannelAwareMessageListener interface. How can I issue basicRecover(true) method during connection create?

Thanks


Answer:

Try to play with com.rabbitmq.client.ConnectionFactory:

/**
 * Enables or disables <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>.
 * @param automaticRecovery if true, enables connection recovery
 * @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
 */
public void setAutomaticRecoveryEnabled(boolean automaticRecovery) {
    this.automaticRecovery = automaticRecovery;
}

it is false by default.

Question:

I am trying to send messages between two local PC (Windows OS). I have connected to these PC using LAN connection. The RabbitMQ server is installed on 10.100.94.25 PC. I am trying to create a connection from 10.100.94.28 PC. I have ping to my rabbit MQ server PC (10.100.94.25) from Client PC (10.100.94.28) using windows command prompt. It was successful. But, when I try to create a connection from code, it is not working. Please check my code and error log bellow. the error is telling me that "timeout error". what should I do?

I have successfully sent and received a message from the same PC. That means, when I use localhost as host, it works perfectly. Then what is the tiny touch I am missing here for remote access?

connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("shoshi");
connectionFactory.setPassword("shoshi");
connectionFactory.setHost("10.100.94.25");
connectionFactory.setPort(5672);
connection = connectionFactory.newConnection(); // this is 451 number line

error:

May 01, 2016 6:00:35 PM com.chat.UI initRabbitMQ
SEVERE: null
java.net.ConnectException: Connection timed out: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:714)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:760)
    at com.chat.UI.initRabbitMQ(UI.java:451)
    at com.chat.UI.<init>(UI.java:48)
    at com.chat.UI$8.run(UI.java:405)
    at java.awt.event.InvocationEvent.dispatch(InvocationEvent.java:251)
    at java.awt.EventQueue.dispatchEventImpl(EventQueue.java:733)
    at java.awt.EventQueue.access$200(EventQueue.java:103)
    at java.awt.EventQueue$3.run(EventQueue.java:694)
    at java.awt.EventQueue$3.run(EventQueue.java:692)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.security.ProtectionDomain$1.doIntersectionPrivilege(ProtectionDomain.java:76)
    at java.awt.EventQueue.dispatchEvent(EventQueue.java:703)
    at java.awt.EventDispatchThread.pumpOneEventForFilters(EventDispatchThread.java:242)
    at java.awt.EventDispatchThread.pumpEventsForFilter(EventDispatchThread.java:161)
    at java.awt.EventDispatchThread.pumpEventsForHierarchy(EventDispatchThread.java:150)
    at java.awt.EventDispatchThread.pumpEvents(EventDispatchThread.java:146)
    at java.awt.EventDispatchThread.pumpEvents(EventDispatchThread.java:138)
    at java.awt.EventDispatchThread.run(EventDispatchThread.java:91)

Answer:

I was guessing that the 5672 port is open or not. Or the firewall is blocking it or not. thanks to cantSleepNow and Thomas for their instruction.

So, what I have done? I just check the port number using telnet. the command is telnet IP_ADDRESS PORT_NUMBER. if telnet is not recognized as a command, then you have to enable it. follow the instruction bellow:

To install Telnet, please follow these instructions:

  1. Click Start then select Control Panel.
  2. Select Programs and Features.
  3. Select Turn Windows features on or off.
  4. Select the Telnet Client option.
  5. Click OK.

or, follow this link

I was able to get a response by using telnet localhost 5672 command. but not with telnet 10.100.94.25 5672 command.

Also, from THIS POST I get that:

  • connection refused means that nothing is running on that port
  • accepted means that something is running on that port
  • timeout means that a firewall is blocking access

And my error log says that java.net.ConnectException: Connection timed out: connect

So, now I am sure that it is a firewall issue. Then I just open erlang for remote access by following THIS TUTORIAL. And now it is working.

To know your PC's IP adress just type ipconfig using CMD.

Question:

I have a CachingConnectionFactory with multiple addresses. When one broker goes down, it connects with the second. Now, when the broker comes up again, I need to destroy existing connections and reconnect to it. But CachingConnectionFactory doesn't have any start, stop methods, instead has only destroy, which might render the factory unusable(?).

Config:

<bean id="testConnFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="addresses" value="rabbit1,rabbit2" />
    <property name="cacheMode" value="CONNECTION" />
    <property name="connectionCacheSize" value="${connection.cache.size}" />
</bean>

Is there any way to do this, programatically?


Answer:

Calling destroy() is safe; the connection(s) will be reset and re-established the next time a component wants one.

Bear in mind, though, that this will impact any in-process operations.

We should probably add a less scary method, such as resetConnection() like we have with the Spring JMS connection factory.

Question:

Let me begin by saying that I just started to dabble with AMQP.

I want to consume/pull data from queue. I'm using Spring's libs (spring-boot-starter-amqp) in order to make things easier. I have a listener class with method annotated with @RabbitListener where I set queue. Everything else is configured via properties:

    rabbitmq:
      username: user
      password: password
      virtual-host: virtual-host
      port: 5672
      host: host
      queue: _316_
      listener:
        simple:
          retry:
            enabled: true
            initial-interval: 1000
            max-attempts: 8
            max-interval: 10000
            multiplier: 2.0
            stateless: true

Everything works fine until I make host unavailable for a while. When that happens connection is dropped and attempts are made in order to reestablish it. After connection is reestablished again listener doesn't start to pull messages. After application is restarted everything is fine, but I'm sure it can be configured somehow that consumer keeps restarting, at least it should try to do it after connection was reestablished (or at least this is what I'd expect).

After connection has been dropped following can be found in logs:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)\n\tat java.base/java.lang.Thread.run(Thread.java:835)\nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat 

Then connection attempt is made and we're in a loop:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED 

Until connection is reestablished:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]

And nothing else happens - no messages are being consumed.

UPDATE: Followed suggestion and turned on DEBUG logging.

When app is starting we're:

  1. starting listener container
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
  1. creating the connection

  2. starting consumer

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
  1. creating channel and starting to consume
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])

This goes on until connection drops:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

Channel is shutdown and user somehow gets deleted as log says:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

Issue with connection driver followed by exception being thrown:

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.support.ConsumerCancelledException: null\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)\n\tat java.base/java.lang.Thread.run(Thread.java:835

Consumer raises exception and it says that processing can r-e-s-t-a-r-t

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it

Restating happens:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

Channels are being closed:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)

New consumer is starting:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

We're attempting to connect which ends up with WARN and AUTHENTICATION failure, (because previous log said that user was deleted?):

An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat

Consumer that tried to start:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup

And it (consumer) gets cancelled:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

Channel is closed and container is stopping:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

And then container is shutting down:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container

We're waiting for workers to finish, it's successful, then we're trying to connect again, the same SOCKET_CLOSED is being logged over and over again.

Then host is brought back and connection is reestablished. Cached Rabit Channel is being created and nothing happens.

I'd assume that issue is that container was shut down and never came back to life, hence there're no consumers.

WHAT WORKED:

I created a class that has a "listenning" method that accepts ListenerContainerConsumerFailedEvent. That class has RabbitListenerEndpointRegistry (bean that Boot conveniently created for me) and whenever that method is called I'm checking if listenerContainer is running if not then I'm starting it (that checking is most likely redundant).

@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
        if (!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }

Answer:

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure

FatalListenerStartupException

Authentication failures are considered fatal and the container is immediately stopped; it is unlikely such situations will be corrected automatically.

Deleting a user that is currently in use is a rather unusual circumstance.

You could use an ApplicationListener bean or @EventListener method to listen for a ListenerContainerConsumerTerminatedEvent and try restarting the container after some time.

Question:

I am a newbie who is trying to integrate qpid with Apache Camel. I need to write java code to read and write from a queue using qpid.

So firstly I downloaded the JMS example from qpid website. The code that I am trying to run is.

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.jms.example;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        try {
            // The configuration for the Qpid InitialContextFactory has been supplied in
            // a jndi.properties file in the classpath, which results in it being picked
            // up automatically by the InitialContext constructor.
            Context context = new InitialContext();

            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
            Destination queue = (Destination) context.lookup("myQueueLookup");

            Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer messageProducer = session.createProducer(queue);
            MessageConsumer messageConsumer = session.createConsumer(queue);

            TextMessage message = session.createTextMessage("Hello world!");
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);

            if (receivedMessage != null) {
                System.out.println(receivedMessage.getText());
            } else {
                System.out.println("No message received within the given timeout!");
            }

            connection.close();
        } catch (Exception exp) {
            System.out.println("Caught exception, exiting.");
            exp.printStackTrace(System.out);
            System.exit(1);
        }
    }

    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}

This is dependent on the file ::

# Set the InitialContextFactory class to use
java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory
# Define the required ConnectionFactory instances
# connectionfactory.<JNDI-lookup-name> = <URI>
connectionfactory.myFactoryLookup = amqp://localhost:5672

# Configure the necessary Queue and Topic objects
# queue.<JNDI-lookup-name> = <queue-name>
# topic.<JNDI-lookup-name> = <topic-name>
queue.myQueueLookup = queue
topic.myTopicLookup = topic

Now I understand that inorder for this to work, I need something known as a Broker service. Doing some research, I found out that I can use RabbitMQ for this purpose. So I downloaded it on my Windows machine and am trying to connect to it on port localhost:5672

But when I run my code, I get the ERROR ::

2017-05-04 11:28:29,329 [main           ] - ERROR JmsConnection                  - Failed to connect to remote at: amqp://localhost:5672
Caught exception, exiting.
javax.jms.JMSException: An existing connection was forcibly closed by the remote host
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:172)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:204)
    at org.apache.qpid.jms.example.HelloWorld.main(HelloWorld.java:48)
Caused by: java.io.IOException: An existing connection was forcibly closed by the remote host
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

Why does this ERROR occur ? Whereas this port is definitely listening on my local machine. Again I am a newbie with JMS so any guidance will be greatly appreciated :)


Answer:

As Tim mentioned, you would at minimum also have needed to ensure the brokers experimental AMQP 1.0 plugin was loaded, which you dont mention doing.

However, in this case it might not make much difference. I didn't have much success using the JMS client or some other AMQP 1.0 clients against RabbitMQ previously due to an issue I reported which stops them in their tracks when creating consumers and producers: https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/34

Your mentioned researching things deciding to use RabbitMQ, which sounds like you aren't tied to an existing server solution? If so, for other servers that support AMQP 1.0 and the JMS client is routinely used against, you could try ActiveMQ, ActiveMQ Artemis, Qpid for Java broker, Qpid C++ broker, or Qpid Dispatch router (doesn't work on Windows, which you mention using), amongst others.

Question:

I tried using the rabbitMQ and while creating the connection I got the below given exception with output, I am not sure what went wrong. The factory is working fine and the hostname,port , username and password are setting correctly, but the connection is failing each time.

In main 
 parametres setted , now creating the connection
The host name and other params are localhost
The host name and other params are admin
The host name and other params are admin
The host name and other params are 5672
 setted all the params


*Exception in thread "main" java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:347)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
    at com.TestSend.main(TestSend.java:39)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.lang.NoClassDefFoundError: org/apache/commons/io/input/ProxyInputStream
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:300)
    ... 3 more
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/io/input/ProxyInputStream
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.rabbitmq.client.impl.AMQImpl.readMethodFrom(AMQImpl.java:3205)
    at com.rabbitmq.client.impl.CommandAssembler.consumeMethodFrame(CommandAssembler.java:93)
    at com.rabbitmq.client.impl.CommandAssembler.handleFrame(CommandAssembler.java:158)
    at com.rabbitmq.client.impl.AMQCommand.handleFrame(AMQCommand.java:87)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:89)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:500)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.input.ProxyInputStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 18 more*

The code I wrote was

 package com;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import com.rabbitmq.client.ConnectionFactory;

    public class TestSend {

        public static void main(String[] args) throws IOException {

            System.out.println("In main ");

            // Setting up connection
            ConnectionFactory factory;
            factory = new ConnectionFactory();
            String host = "localhost";
            String password = "admin";
            String user = "admin";
            String port = "5672";

            System.out.println(" parametres setted , now creating the connection");

            factory.setHost(host);
            factory.setPassword(password);
            factory.setUsername(user);
            factory.setPort(Integer.parseInt(port));
            factory.setRequestedHeartbeat(30);

            System.out.println("The host name and other params are " + factory.getHost());

            System.out.println("The host name and other params are " + factory.getUsername());
            System.out.println("The host name and other params are " + factory.getPassword());
            System.out.println("The host name and other params are " + factory.getPort());
            System.out.println("Setted all the params");
            Connection connection;


            connection = factory.newConnection();

            System.out.println("Establishing the connection " + host);





            String s = "Hello";

            Channel channel; // This channel sends the file
            channel = connection.createChannel();
            System.out.println("declaring Queue now");
            channel.queueDeclare("nav", true, false, false, null);
            channel.basicPublish("", "nav", null, s.getBytes());

            System.out.println(" seems like everything has been sent to the queue ");
            System.out.println("Closing the connection");
            channel.close();
            connection.close();

        }
    }

Answer:

I just found the solution to this . I added the common-io-2.5.jar file in the project and after re-building it ,it worked.

The jar was obtained from the link : http://commons.apache.org/proper/commons-io/download_io.cgi

Question:

I knew Spring AMQP support failover well, but my question is, how can I load-balanced the connections between consumer side and cluster nodes of RabbitMQ broker?

Any response will be greatly appreciated.

Regards.


Answer:

According to the RabbitMQ Clustering Guide:

A client can connect as normal to any node within a cluster. If that node fails and the rest of the cluster survives, then the client should notice the closed connection, and should be able to reconnect to some surviving member of the cluster. Generally, it's not advisable to bake in node hostnames or IP addresses into client applications: this introduces inflexibility and will require client applications to be edited, recompiled and redeployed should the configuration of the cluster change or the number of nodes in the cluster change. Instead, we recommend a more abstracted approach: this could be a dynamic DNS service which has a very short TTL configuration, or a plain TCP load balancer, or some sort of mobile IP achieved with pacemaker or similar technologies. In general, this aspect of managing the connection to nodes within a cluster is beyond the scope of RabbitMQ itself, and we recommend the use of other technologies designed specifically to solve these problems.

Also see this answer: RabbitMQ client load balancing.

And a quote from here:

Create a load balance in front of it and map the backend MQ instance. You can choose either HAProxy or Apache or Nginx or any hardware load balancer you use in your organization.

If the servers are running in AWS inside a VPC, then choose internal load balancer. Update the application to point to the load balancer end point.

As you see there is enough info in the Internet on the matter. Right, I didn't provide my opinion here, but it looks like there is no so much difficulties to achieve the desired solution.

Just configure some load-balancer for Rabbit nodes and use an url from it for the clients.

Question:

When creating the project from springinitializer, I added rabbitmq dependency but later I remove it after deciding not to use it. Now, I am getting connection refused error only once after I run the application. It does not affect the usage/functionality of the rest api and it works very well but no one wants errors in their apps eventhough it does not cause problems. I cleaned and rebuilt the project and it does not work. It may be related to health check of actuator but could not find a way out. Here is the exception message;

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:61) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:510) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:751) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2048) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.getVersion(RabbitHealthIndicator.java:49) ~[spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.amqp.RabbitHealthIndicator.doHealthCheck(RabbitHealthIndicator.java:44) ~[spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82) ~[spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:81) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:38) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:108) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:119) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:105) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:83) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:70) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:75) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) [spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:77) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96) [spring-boot-actuator-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) [na:1.8.0_131]
at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) [na:1.8.0_131]
at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468) [na:1.8.0_131]
at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) [na:1.8.0_131]
at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) [na:1.8.0_131]
at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) [na:1.8.0_131]
at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829) [na:1.8.0_131]
at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:346) [na:1.8.0_131]
at sun.rmi.transport.Transport$1.run(Transport.java:200) [na:1.8.0_131]
at sun.rmi.transport.Transport$1.run(Transport.java:197) [na:1.8.0_131]
at java.security.AccessController.doPrivileged(Native Method) [na:1.8.0_131]
at sun.rmi.transport.Transport.serviceCall(Transport.java:196) [na:1.8.0_131]
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568) [na:1.8.0_131]
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826) [na:1.8.0_131]
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683) [na:1.8.0_131]
at java.security.AccessController.doPrivileged(Native Method) [na:1.8.0_131]
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682) [na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_131]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_131]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_131]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_131]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_131]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_131]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_131]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_131]
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1113) ~[amqp-client-5.7.3.jar:5.7.3]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1063) ~[amqp-client-5.7.3.jar:5.7.3]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.connect(AbstractConnectionFactory.java:526) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:473) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 50 common frames omitted

Answer:

  1. Make sure to really have removed "spring-boot-starter-amqp" from your pom.xml/build.gradle file.
  2. Post your complete dependencies section, so we can find out if there's still another dependency inside which pulls in "messaging".

Question:

I have problem with connection to rabbitmq via Apache Camel on Spring Boot 2.

I did following steps:

My dependencies:

implementation "org.apache.camel:camel-spring-boot-starter:${camelVersion}"
implementation "org.apache.camel:camel-jackson-starter:${camelVersion}"
implementation "org.apache.camel:camel-core:${camelVersion}"
implementation "org.apache.camel:camel-rabbitmq-starter:${camelVersion}"
implementation "org.springframework.boot:spring-boot-starter-amqp"

Application.yaml

spring:
      rabbitmq:
      dynamic: true
      host: 192.168.1.1
      port: 5672
      username: X
      password: Y

And I have following route:

@Component
public class BasicRoute extends RouteBuilder {

@Override
public void configure() throws Exception {

    from("direct:loggerQueue")
            .id("loggerQueue")
            .to("rabbitmq://TEST-QUEUE.exchange?queue=TEST-QUEUE.queue&autoDelete=false&connectionFactory=#rabbitConnectionFactory")
            .end();
}

}

Finnaly I have still following issue:

2019-03-06 12:46:05.766 WARN 19464 --- [ restartedMain] o.a.c.c.rabbitmq.RabbitMQProducer : Failed to create connection. It will attempt to connect again when publishing a message. java.net.ConnectException: Connection refused: connect

Connection seems ok, I tested it. Something is bad with rabbitConnectionFactory.

I don't know what I have bad.


Answer:

The problem appears to be that RabbitMQComponent is expecting to find a connection factory of type com.rabbitmq.client.ConnectionFactory.

However, the springboot auto-configure is creating a connection factory of type org.springframework.amqp.rabbit.connection.CachingConnectionFactory.

So, whenever the RabbitMQComponent attempts to find the appropriate connection factory, because it is looking for the specific type, and because it does not subclass the rabbitmq ConnectionFactory, it returns a null value, and fails to use the appropriate host name and configuration parameters specified in your application.yml.

You should also see the following in your log if you have debug level set:
2019-12-15 17:58:53.631 DEBUG 48710 --- [           main] o.a.c.c.rabbitmq.RabbitMQComponent       : Creating RabbitMQEndpoint with host null:0 and exchangeName: asterix
2019-12-15 17:58:55.927 DEBUG 48710 --- [           main] o.a.c.c.rabbitmq.RabbitMQComponent       : Creating RabbitMQEndpoint with host null:0 and exchangeName: asterix-sink

EDIT: The CachingConnectionFactory is configured with the required Rabbit connection factory as part of the autoconfiguration. However, you need to provide a link to the correct factory.

Therefore, you need to add a @Bean to disambiguate.

@Configuration
@RequiredArgsConstructor
public class CamelConfig {

  private final CachingConnectionFactory rabbitConnectionFactory;

  @Bean
  com.rabbitmq.client.ConnectionFactory rabbitSourceConnectionFactory() {
    return rabbitConnectionFactory.getRabbitConnectionFactory();
  }
}

and in your endpoint configuration:

rabbitmq:asterix?connectionFactory=#rabbitSourceConnectionFactory

Note that the # is optional, as it gets stripped out within the code when it is trying to find the rabbit connection factory bean.

In your application.yml, configure the connection parameters (the url is no longer included in the endpoint URI).

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

Question:

I am using cloudamqp message service for trying out MQs (RabbitMQ in this case). I am trying to push a message into the Queue from a Java class but it always says:

Connection Refused

Also System.getenv returns null (I am stuck with that also).

Below is the code and exception:

    String uri = System.getenv("amqp://xxuserxx:password_xxxxxxxxxx@Cloudamqhost/vhost");
        if (uri == null) uri = "amqp://guest:guest@localhost";
    ConnectionFactory factory = new ConnectionFactory();
    try {
        factory.setUri(uri);
    } catch (URISyntaxException e) {
        e.printStackTrace();
    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    } catch (KeyManagementException e) {
        e.printStackTrace();
    }

    //Recommended settings
    factory.setRequestedHeartbeat(30);
    factory.setConnectionTimeout(60000);

    Connection connection = null;
    try {
        connection = factory.newConnection(); //Getting Null Pointer here
    } catch (TimeoutException e) {
        e.toString();

Stacktrace:

Exception in thread "main" java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:134)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:997)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:956)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:914)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:1068)

When I am catching generic Exception, it is catching null pointer exception:

java.lang.NullPointerException @connection = factory.newConnection()


Answer:

String uri = 
System.getenv("amqp://xxuserxx:password_xxxxxxxxxx@Cloudamqhost/vhost");
if (uri == null) uri = "amqp://guest:guest@localhost";

This is strange, either you set the uri as a constant :

String uri = "amqp://xxuserxx:password_xxxxxxxxxx@Cloudamqhost/vhost";

or you try to get the uri value for an environment variable by it's name

System.getenv("AMQP_CONNECTION_STRING");
if (uri == null) uri = "amqp://guest:guest@localhost";

Question:

I have 2 identical RabbitMQ servers, and I want to publish event to one of it (process exactly once), failing over to another in case of publishing failure.

  • Spring Cloud Stream has 1 binder for each server.
  • MQ servers have same exchanges and durable queues configured, but queues don't have HA policy.

The questions are:

  1. What is the best design to do this? Preferably by making changes in high level Cloud Stream configuration, not diving into Spring AMQP.
  2. How can I hook to asynchronous publishing and it's result? Preferably not by making it synchronous. Override some bean?
  3. Can RabbitMQ HA help in any way? As I understood, the whole durable Queue is present only on single node to preserve order of messsages (actually I don't need the order). So if I configure HA and the node with durable HA queue fails/stops, processing and publishing will crash?

Answer:

  1. See the boot documentation: spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect. The connection factory will automatically fail over.

  2. If you set the producer errorChannelEnabled you will get returned messages in the error channel; this needs a connection factory configured for returns. There is no current mechanism to get async positive acks, unless you use Spring AMQP directly to publish messages.

  3. With HA, a new master node is selected for the queue(s) hosted by the failed node. While the queue is hosted on one node, it is copied to the others.

Question:

I implement it in java and want to have a constructor for an end point:

public EndPoint(final String endPointName, final String host, final int port){
    this.name=endPointName;
    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(host);
    factory.setPort(port);
    connection = factory.newConnection();

    channel = connection.createChannel();
    channel.queueDeclare(name, false, false, false, null);  

}   

And I receive EOFException:connection refused even when I set the arguments "localhost" and 15672 which are considered to be default as far as I understand. The problem is with the method setPort(): without it everything works perfectly.

Why do I think that 15672 port is correct? Without setPort() I can open my browser Rabbitmq helper at localhost:15672 and see my created queues.

How can I set the port without an exception?


Answer:

The default port of RabbitMQ's operational protocol is 5672; port 15672 is the default port for the management plugin (the "GUI").

Question:

i'm writting a small program for crawling all the urls received from a RabbitMQ queue. I've tested with about 6000 urls and after a half of them have been processed, the client stop without an exception, all the connections to the rabbitmq server are lost. I've checked also the log and have some thing like

=WARNING REPORT==== 7-Apr-2016::10:40:50 ===
closing AMQP connection <0.32373.9> (192.168.55.1:55716 -> 192.168.55.100:5672):
connection_closed_abruptly

I'm using this connection factory setting:

ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.55.100");
        factory.setUsername(params.username);
        factory.setPassword(params.password);
        factory.setVirtualHost(params.virtualHost);
        factory.setAutomaticRecoveryEnabled(true);
        factory.setRequestedHeartbeat(2);

Do you have some ideas about this? Regards!


Answer:

you have to configure RabbitMQ and you OS to handle more connections.

Read this: https://www.rabbitmq.com/networking.html

Erlang VM I/O Thread Pool Erlang runtime uses a pool of threads for performing I/O operations asynchronously. The size of the pool is configured via the +A VM command line flag, e.g. +A 128. We highly recommend overriding the flag using the RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS environment variable:

RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+A 128"

and also this: https://www.rabbitmq.com/production-checklist.html

Open File Handles Limit Operating systems limit maximum number of concurrently open file handles, which includes network sockets. Make sure that you have limits set high enough to allow for expected number of concurrent connections and queues.

Question:

I'm running a stock RabbitMQ install on MacOS. Start the server up fine with just 'rabbitmq-server'.

Using the Java API I can easily connect to RabbitMQ with "localhost" as the host like this:

val factory = new ConnectionFactory()
factory.setHost( "localhost" )
val connection = factory.newConnection()

However, when I try a different IP (e.g. the actual IP of my machine) I get a connection refused error.

Exception in thread "main" java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:615)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:639)
    at com.gwz.Junk$delayedInit$body.apply(Junk.scala:8)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at com.gwz.Junk$.main(Junk.scala:5)
    at com.gwz.Junk.main(Junk.scala)

My ifconfig shows inet addresses like these:

inet 127.0.0.1 netmask 0xff000000
inet 172.16.240.21 netmask 0xffffff00 broadcast 172.16.240.255
inet 192.168.59.3 netmask 0xffffff00 broadcast 192.168.59.255
inet 10.0.0.125 netmask 0xffffff00 broadcast 10.0.0.255

The loopback 127.0.0.1 works. The others, however, don't. I need one of the other 3 to work.

Do I need to do something else to allow connection by my local IP?


Answer:

There may be more than one solution, but what I found was this:

First open up loopback_users in rabbitmq.conf:

[{rabbit, [{loopback_users, []}]}].

Then I put my local machine's IP address in rabbitmq-env.conf (wherever its installed on your machine):

NODE_IP_ADDRESS=10.0.1.45

This defaulted to localhost for me, so these two changes together allowed my guest account to be accessed with a non-localhost IP.

Question:

I am new to RabbitMQ and trying out simple example on it. Below is my java source:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
}

And am getting below error:

Exception in thread "main" java.net.ConnectException: Connection refused: connect

at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:588)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:612)
at .Send.main(Send.java:15)

Any suggestions please?


Answer:

Do you have your RabbitMQ server installed?

Also do you have the rabbitmq-client.jar and its dependencies on the classpath?

Try running this from your terminal:

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

Question:

I am using the Spring AMQP library .How do I configure connectionTimeouts and socketTimeouts in CachingConnectionFactory.I assume it must internally use http://www.rabbitmq.com/javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout(int)


Answer:

We have an open JIRA to expose some more of these attributes, which should be in the next releease.

In the meantime, you can wire up a Rabbit ConnectionFactory and provide it to the CachingConnectionFactory - see the documentation.

Question:

have a simple Spring-Cloud-Stream project that I try to connect with RabbitMQ, It says its connected but It's not working. Did I do something wrong in the code?

Application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080

HelloBinding interface

package com.gateway.cloudstreamproducerrabbitmq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface HelloBinding {

    @Output("greetingChannel")
    MessageChannel greeting();
}

ProducerController

package com.gateway.cloudstreamproducerrabbitmq;

import com.gateway.cloudstreamproducerrabbitmq.HelloBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    private MessageChannel greet;

    public ProducerController(HelloBinding binding) {
        greet = binding.greeting();
    }

    @GetMapping("/greet/{name}")
    public void publish(@PathVariable String name) {
        String greeting = "Hello, " + name + "!";
        Message<String> msg = MessageBuilder.withPayload(greeting)
                .build();
        this.greet.send(msg);
    }
}

And last I have a @EnableBinding(HelloBinding.class) in the main class that starts the application.


Answer:

To setup spring cloud stream with rabbitmq binder implementation you need to configure this in your pom.xml 1.

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
 </dependency>
 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
 </dependency>
 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

then you define this in your application.properties/yaml

2.

spring:
    cloud:
        stream:
            bindings:
                greetingChannel
                    destination: test.greeting
                    group: queue
            rabbit:
                bindings:
                    greetingChannel:
                        producer:
                            transacted: true //optional

  1. EnableBinding(HelloBinding.class)
  2. Inject binding and use it
helloBinding.greeting().send(MessageBuilder
                .withPayload(...)
                .build());
  1. Setup of rabbitMQ properties

Question:

I have installed rabbit MQ 3.8.3 on windows 10 and I can see it is running as windows services. When I try to access http://localhost:15672/ it is unreachable.

I have enabled the rabbit MQ management plugin in sbin directory rabbitmq-plugins enable rabbitmq_management

But still, http://localhost:15672/ is unreachable.

Getting the following error in java service :

org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

I have also run the command to see if anything is running on port 5672:

Command : netstat -ano | find "5672"

Response : TCP 0.0.0.0:25672 0.0.0.0:0 LISTENING 2900

How do I fix this?


Answer:

The management UI can be accessed using a Web browser at http://{node-hostname}:15672/

The management plugin is included in the RabbitMQ distribution. Like any other plugin, it must be enabled before it can be used.

please refer below URL: https://www.rabbitmq.com/management.html

Question:

Our system has a module which uses the Spring framework with the spring-rabbit component. The module started failing Saturday evening (19-JAN-2019). In the logs, the following error was popping up:

[2019-01-19 17:50:07.458] INFO   com.mcm.spring.SpringMain                         ============ SPRING START ============ 
[2019-01-19 17:50:07.708] INFO   com.mcm.spring.MongoApplicationContext            Requested resource: META-INF/spring/application-context.xml 
[2019-01-19 17:50:07.926] INFO   com.mcm.spring.MongoApplicationContext            Requested resource: classpath:/META-INF/spring/amqp.xml 
[2019-01-19 17:50:08.846] ERROR  com.mcm.spring.SpringMain                         Failed to initialize Spring context from location: META-INF/spring/application-context.xml org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Failed to import bean definitions from URL location [classpath:/META-INF/spring/amqp.xml]
Offending resource: class path resource [META-INF/spring/application-context.xml]; nested exception is org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 10 in XML document from class path resource [META-INF/spring/amqp.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 10; columnNumber: 91; cvc-complex-type.2.4.c: The matching wildcard is strict, but no declaration can be found for element 'rabbit:connection-factory'.

At first I thought maybe the spring framework was updated so I checked the URLs referenced in the module's .xml definition file, the header of which is provided here:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans   http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/rabbit  https://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
>

    <rabbit:connection-factory id="connectionFactory" host="${rabbitmqHost}"
                               username="${rabbitmqUser}" password="${rabbitmqPassword}"/>
...

but the referenced URLs seemed unmodified. What ended up fixing the problem was changing the referenced spring-rabbit-1.0.xsd, so we changed it from:

https://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd

to

http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd

The unsettling part about this problem is that it appears to only affect some of the hosts in our system and not others. And even though I had fixed the underlying problem, I would like to understand what the root cause of the problem is.

EDIT:

If I run the program with -Djavax.net.debug=SSL I see it fails while processing the certificate associated with the springframework.org site.

trigger seeding of SecureRandom
done seeding SecureRandom
Ignoring unavailable cipher suite: TLS_DHE_DSS_WITH_AES_256_GCM_SHA384
Ignoring unavailable cipher suite: TLS_RSA_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_DHE_RSA_WITH_AES_256_GCM_SHA384
Ignoring unavailable cipher suite: TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384
Ignoring unavailable cipher suite: TLS_RSA_WITH_AES_256_CBC_SHA256
Ignoring unavailable cipher suite: TLS_DHE_DSS_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_ECDH_ECDSA_WITH_AES_256_GCM_SHA384
Ignoring unavailable cipher suite: TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384
Ignoring unavailable cipher suite: TLS_RSA_WITH_AES_256_GCM_SHA384
Ignoring unavailable cipher suite: TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384
Ignoring unavailable cipher suite: TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384
Ignoring unavailable cipher suite: TLS_ECDH_RSA_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_ECDH_RSA_WITH_AES_256_GCM_SHA384
Ignoring unavailable cipher suite: TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384
Ignoring unavailable cipher suite: TLS_DHE_RSA_WITH_AES_256_CBC_SHA256
Ignoring unavailable cipher suite: TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_DHE_DSS_WITH_AES_256_CBC_SHA256
Ignoring unavailable cipher suite: TLS_DHE_RSA_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA
Ignoring unavailable cipher suite: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
Allow unsafe renegotiation: false
Allow legacy hello messages: true
Is initial handshake: true
Is secure renegotiation: false
main, setSoTimeout(0) called
Ignoring unsupported cipher suite: TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_RSA_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_DHE_RSA_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 for TLSv1
Ignoring unsupported cipher suite: TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 for TLSv1.1
Ignoring unsupported cipher suite: TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 for TLSv1.1
Ignoring unsupported cipher suite: TLS_RSA_WITH_AES_128_CBC_SHA256 for TLSv1.1
Ignoring unsupported cipher suite: TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256 for TLSv1.1
Ignoring unsupported cipher suite: TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256 for TLSv1.1
Ignoring unsupported cipher suite: TLS_DHE_RSA_WITH_AES_128_CBC_SHA256 for TLSv1.1
Ignoring unsupported cipher suite: TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 for TLSv1.1
%% No cached client session
*** ClientHello, TLSv1.2
RandomCookie:  GMT: 1548179066 bytes = { 222, 245, 173, 48, 133, 200, 171, 99, 126, 94, 203, 91, 117, 191, 77, 23, 240, 62, 47, 106, 76, 44, 254, 213, 202, 252, 233, 120 }
Session ID:  {}
Cipher Suites: [TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA256, TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256, TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256, TLS_DHE_RSA_WITH_AES_128_CBC_SHA256, TLS_DHE_DSS_WITH_AES_128_CBC_SHA256, TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA, TLS_ECDH_RSA_WITH_AES_128_CBC_SHA, TLS_DHE_RSA_WITH_AES_128_CBC_SHA, TLS_DHE_DSS_WITH_AES_128_CBC_SHA, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256, TLS_DHE_RSA_WITH_AES_128_GCM_SHA256, TLS_DHE_DSS_WITH_AES_128_GCM_SHA256, TLS_ECDHE_ECDSA_WITH_3DES_EDE_CBC_SHA, TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, SSL_RSA_WITH_3DES_EDE_CBC_SHA, TLS_ECDH_ECDSA_WITH_3DES_EDE_CBC_SHA, TLS_ECDH_RSA_WITH_3DES_EDE_CBC_SHA, SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA, SSL_DHE_DSS_WITH_3DES_EDE_CBC_SHA, TLS_ECDHE_ECDSA_WITH_RC4_128_SHA, TLS_ECDHE_RSA_WITH_RC4_128_SHA, SSL_RSA_WITH_RC4_128_SHA, TLS_ECDH_ECDSA_WITH_RC4_128_SHA, TLS_ECDH_RSA_WITH_RC4_128_SHA, SSL_RSA_WITH_RC4_128_MD5, TLS_EMPTY_RENEGOTIATION_INFO_SCSV]
Compression Methods:  { 0 }
Extension elliptic_curves, curve names: {secp256r1, sect163k1, sect163r2, secp192r1, secp224r1, sect233k1, sect233r1, sect283k1, sect283r1, secp384r1, sect409k1, sect409r1, secp521r1, sect571k1, sect571r1, secp160k1, secp160r1, secp160r2, sect163r1, secp192k1, sect193r1, sect193r2, secp224k1, sect239k1, secp256k1}
Extension ec_point_formats, formats: [uncompressed]
Extension signature_algorithms, signature_algorithms: SHA512withECDSA, SHA512withRSA, SHA384withECDSA, SHA384withRSA, SHA256withECDSA, SHA256withRSA, SHA224withECDSA, SHA224withRSA, SHA1withECDSA, SHA1withRSA, SHA1withDSA, MD5withRSA
Extension server_name, server_name: [type=host_name (0), value=www.springframework.org]
***
main, WRITE: TLSv1.2 Handshake, length = 239
main, READ: TLSv1.2 Handshake, length = 87
*** ServerHello, TLSv1.2
RandomCookie:  GMT: 1548179066 bytes = { 15, 226, 52, 211, 193, 89, 176, 195, 88, 215, 84, 65, 175, 26, 247, 122, 240, 146, 131, 10, 113, 181, 82, 4, 58, 133, 166, 64 }
Session ID:  {234, 226, 58, 64, 56, 115, 122, 212, 167, 56, 115, 117, 62, 243, 183, 68, 35, 203, 216, 169, 109, 121, 123, 165, 61, 176, 173, 151, 66, 226, 234, 185}
Cipher Suite: TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256
Compression Method: 0
Extension renegotiation_info, renegotiated_connection: <empty>
Extension ec_point_formats, formats: [uncompressed]
***
%% Initialized:  [Session-1, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256]
** TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256
main, READ: TLSv1.2 Handshake, length = 2244
*** Certificate chain
chain [0] = [
[
  Version: V3
  Subject: CN=ssl385749.cloudflaressl.com, OU=PositiveSSL Multi-Domain, OU=Domain Control Validated
  Signature Algorithm: SHA256withECDSA, OID = 1.2.840.10045.4.3.2

  Key:  Sun EC public key, 256 bits
  public x coord: 66837509185678763302911536120437768078260771437122799932673434543093661334104
  public y coord: 99553009056106338955190616002251795226782944025455477626102184647747111863758
  parameters: secp256r1 [NIST P-256, X9.62 prime256v1] (1.2.840.10045.3.1.7)
  Validity: [From: Fri Jan 18 19:00:00 EST 2019,
               To: Sun Jul 28 19:59:59 EDT 2019]
  Issuer: CN=COMODO ECC Domain Validation Secure Server CA 2, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
  SerialNumber: [    c63a11c6 0ff58b5a b3b40da5 48daf711]

Certificate Extensions: 10
[1]: ObjectId: 1.3.6.1.4.1.11129.2.4.2 Criticality=false
Extension unknown: DER encoded OCTET string =
0000: 04 81 F5 04 81 F2 00 F0   00 76 00 BB D9 DF BC 1F  .........v......
0010: 8A 71 B5 93 94 23 97 AA   92 7B 47 38 57 95 0A AB  .q...#....G8W...
0020: 52 E8 1A 90 96 64 36 8E   1E D1 85 00 00 01 68 65  R....d6.......he
0030: B7 E2 E3 00 00 04 03 00   47 30 45 02 20 0C F3 98  ........G0E. ...
0040: A7 86 90 18 E7 C7 4D 58   06 0E 9C 08 5E 58 85 FD  ......MX....^X..
0050: 1C BF 0C 74 25 6F 1C 7A   D2 5B 3B F4 6A 02 21 00  ...t%o.z.[;.j.!.
0060: F0 DD 45 D0 F3 58 67 34   C7 58 45 2D 77 5F 48 69  ..E..Xg4.XE-w_Hi
0070: 45 46 72 53 E6 B0 B6 9C   64 A6 BE 69 C1 87 AA F6  EFrS....d..i....
0080: 00 76 00 74 7E DA 83 31   AD 33 10 91 21 9C CE 25  .v.t...1.3..!..%
0090: 4F 42 70 C2 BF FD 5E 42   20 08 C6 37 35 79 E6 10  OBp...^B ..75y..
00A0: 7B CC 56 00 00 01 68 65   B7 E3 3C 00 00 04 03 00  ..V...he..<.....
00B0: 47 30 45 02 20 5F 56 94   7E E7 60 93 CF 4A 46 DF  G0E. _V...`..JF.
00C0: F7 FE 6D E4 28 88 CA 35   59 45 33 EC 99 38 D7 71  ..m.(..5YE3..8.q
00D0: BC 3F 54 0B DB 02 21 00   8A 12 AD AA CC 83 8F 46  .?T...!........F
00E0: B4 4C 3A 7F BD 85 05 E6   92 FE 4F F5 AC 5B AA 80  .L:.......O..[..
00F0: BE E9 FE D5 D7 B0 DB A1                            ........


[2]: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
  [
   accessMethod: caIssuers
   accessLocation: URIName: http://crt.comodoca4.com/COMODOECCDomainValidationSecureServerCA2.crt
, 
   accessMethod: ocsp
   accessLocation: URIName: http://ocsp.comodoca4.com
]
]

[3]: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 40 09 61 67 F0 BC 83 71   4F DE 12 08 2C 6F D4 D4  @.ag...qO...,o..
0010: 2B 76 3D 96                                        +v=.
]
]

[4]: ObjectId: 2.5.29.19 Criticality=true
BasicConstraints:[
  CA:false
  PathLen: undefined
]

[5]: ObjectId: 2.5.29.31 Criticality=false
CRLDistributionPoints [
  [DistributionPoint:
     [URIName: http://crl.comodoca4.com/COMODOECCDomainValidationSecureServerCA2.crl]
]]

[6]: ObjectId: 2.5.29.32 Criticality=false
CertificatePolicies [
  [CertificatePolicyId: [1.3.6.1.4.1.6449.1.2.2.7]
[PolicyQualifierInfo: [
  qualifierID: 1.3.6.1.5.5.7.2.1
  qualifier: 0000: 16 1D 68 74 74 70 73 3A   2F 2F 73 65 63 75 72 65  ..https://secure
0010: 2E 63 6F 6D 6F 64 6F 2E   63 6F 6D 2F 43 50 53     .comodo.com/CPS

]]  ]
  [CertificatePolicyId: [2.23.140.1.2.1]
[]  ]
]

[7]: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
  serverAuth
  clientAuth
]

[8]: ObjectId: 2.5.29.15 Criticality=true
KeyUsage [
  DigitalSignature
]

[9]: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
  DNSName: ssl385749.cloudflaressl.com
  DNSName: *.springframework.org
  DNSName: springframework.org
]

[10]: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 62 C4 60 99 6C BD 49 62   A0 23 61 F1 FF 8E 3D 19  b.`.l.Ib.#a...=.
0010: 68 DB 1D 1C                                        h...
]
]

]
  Algorithm: [SHA256withECDSA]
  Signature:
0000: 30 44 02 20 20 B3 4F C7   83 7F BA 2D F5 C1 C6 09  0D.  .O....-....
0010: A2 2A 32 C0 CF 3D B1 F8   9D FF 32 A5 A0 35 B8 FF  .*2..=....2..5..
0020: 0B D9 5D B9 02 20 28 52   41 C1 80 02 7F 4B 43 24  ..].. (RA....KC$
0030: 45 21 35 FD BE D9 8C 13   73 AF 98 0E DC 67 C4 5E  E!5.....s....g.^
0040: B7 D3 3A BE E7 E6                                  ..:...

]
chain [1] = [
[
  Version: V3
  Subject: CN=COMODO ECC Domain Validation Secure Server CA 2, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
  Signature Algorithm: SHA384withECDSA, OID = 1.2.840.10045.4.3.3

  Key:  Sun EC public key, 256 bits
  public x coord: 1003745160476881206339073530943807232389873597117160669404019647835895530218
  public y coord: 112735960696801970978259026239805217413696993678636841464359769702732092974253
  parameters: secp256r1 [NIST P-256, X9.62 prime256v1] (1.2.840.10045.3.1.7)
  Validity: [From: Wed Sep 24 20:00:00 EDT 2014,
               To: Mon Sep 24 19:59:59 EDT 2029]
  Issuer: CN=COMODO ECC Certification Authority, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
  SerialNumber: [    5b25ce69 07c42655 66d3390c 99a954ad]

Certificate Extensions: 8
[1]: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
  [
   accessMethod: caIssuers
   accessLocation: URIName: http://crt.comodoca.com/COMODOECCAddTrustCA.crt
, 
   accessMethod: ocsp
   accessLocation: URIName: http://ocsp.comodoca4.com
]
]

[2]: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 75 71 A7 19 48 19 BC 9D   9D EA 41 47 DF 94 C4 48  uq..H.....AG...H
0010: 77 99 D3 79                                        w..y
]
]

[3]: ObjectId: 2.5.29.19 Criticality=true
BasicConstraints:[
  CA:true
  PathLen:0
]

[4]: ObjectId: 2.5.29.31 Criticality=false
CRLDistributionPoints [
  [DistributionPoint:
     [URIName: http://crl.comodoca.com/COMODOECCCertificationAuthority.crl]
]]

[5]: ObjectId: 2.5.29.32 Criticality=false
CertificatePolicies [
  [CertificatePolicyId: [2.5.29.32.0]
[]  ]
  [CertificatePolicyId: [2.23.140.1.2.1]
[]  ]
]

[6]: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
  serverAuth
  clientAuth
]

[7]: ObjectId: 2.5.29.15 Criticality=true
KeyUsage [
  DigitalSignature
  Key_CertSign
  Crl_Sign
]

[8]: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 40 09 61 67 F0 BC 83 71   4F DE 12 08 2C 6F D4 D4  @.ag...qO...,o..
0010: 2B 76 3D 96                                        +v=.
]
]

]
  Algorithm: [SHA384withECDSA]
  Signature:
0000: 30 65 02 31 00 AC 68 47   25 80 13 4F 13 56 C0 A2  0e.1..hG%..O.V..
0010: 37 09 97 5A 50 C4 E7 ED   B4 61 CB 28 8A 0A 11 32  7..ZP....a.(...2
0020: A6 E2 71 DF 11 01 89 6F   07 7A 20 66 6B 18 D0 B9  ..q....o.z fk...
0030: 2E 43 F7 52 6F 02 30 12   85 7C 8E 13 66 92 04 BA  .C.Ro.0.....f...
0040: 9A 45 09 94 4A 30 61 D1   49 DC 6F EB E7 2D C9 89  .E..J0a.I.o..-..
0050: CF 1E 6A 7C EC 85 CE 30   25 59 BA 81 70 34 B8 34  ..j....0%Y..p4.4
0060: 7F E7 01 D1 E2 CB 52                               ......R

]
***
%% Invalidated:  [Session-1, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256]
main, SEND TLSv1.2 ALERT:  fatal, description = certificate_unknown
main, WRITE: TLSv1.2 Alert, length = 2
main, called closeSocket()
main, handling exception: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

Answer:

It was the old Java version that was unable to handle the latest SSL certificate

Question:

I am trying to use RabbitMQ Web STOMP Plugin with Spring boot. I have started the RabbitMQ server with the 15674 port exposed for the http/web-stomp protocol. When I run the Spring boot project, I get the below error

o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session system: Transport failure: java.lang.IllegalArgumentException: No enum constant org.springframework.messaging.simp.stomp.StompCommand.HTTP/1.1 400 Bad Request

io.netty.handler.codec.DecoderException: java.lang.IllegalArgumentException: No enum constant org.springframework.messaging.simp.stomp.StompCommand.HTTP/1.1 400 Bad Request

Below is my pom.xml dependencies

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.2.2.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
        <version>0.8.2.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.0.33.Final</version>
    </dependency>
</dependencies>

I am using below class as the web socket configurations

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements 
WebSocketMessageBrokerConfigurer {

   @Override
   public void configureMessageBroker(MessageBrokerRegistry registry) {
       registry.setApplicationDestinationPrefixes("/app")
               .enableStompBrokerRelay("/topic")
               .setRelayHost("localhost")
               .setRelayPort(15674)
               .setClientLogin("guest")
               .setClientPasscode("guest");
}

   @Override
   public void registerStompEndpoints(StompEndpointRegistry registry) {
       registry.addEndpoint("/websocket").withSockJS();

   }
}

Below is a snapshot from my RabbitMQ Web plugin which shows the exposed ports

Can anyone help with this?


Answer:

Your port for relay is wrong. Looks at your plugin config on that screenshot. The STOMP port is 61613. And this one is exactly default one in the StompBrokerRelayRegistration:

public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {

    private String relayHost = "127.0.0.1";

    private int relayPort = 61613;

    private String clientLogin = "guest";

    private String clientPasscode = "guest";

    private String systemLogin = "guest";

    private String systemPasscode = "guest";

Not sure why you have decided to use that http/web-stomp plugin for your application: https://www.rabbitmq.com/web-stomp.html

We talk here about exactly STOMP Broker. Our Spring application is going to be WebSocket proxy over that one. The Web STOMP RabbitMQ plugin is for target WebSocket clients. This is not for server side to relay over STOMP Broker.

Question:

I've detected many "Broken Pipe" or "Connection refused" errors in my application using RabbitMQ Java Driver amqp-client version 5.1.2. RabbitMQ Server version is 3.7.3.

To throttle the incoming messages, i'll just put a Thread.sleep(2000) loop into DefaultConsumer with autoAck=0. Sure, i could set channel.basicQos(..), but i need to throttle with more than just "how many messages do i have currently".

However, this pseudo code leads to java.net.SocketException:

// register new connection
// register new channel / consumer for receiving messages which waits for 2 seconds on each handleDelivery
// sleep for 60 seconds (main-thread) and let the consumer do its job
// register new channel (for writing)

And another case would be that after sleeping several minutes in handleDelivery, a connection is reset / broken pipe / etc. But let's focus on the first case with the pseudo code.

This leads to the following stacktrace when creating the second channel, which also uses the first connection.

java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177)
    at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559)
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:447)
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:429)
    at com.rabbitmq.client.impl.AMQChannel.quiescingRpc(AMQChannel.java:346)
    at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:337)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:277)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
    at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
    at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)
    at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:108)
    at myapp.rabbitmq.RabbitMQPool.registerChannel(RabbitMQPool.java:232)
    at myapp.rabbitmq.RabbitMQPool.registerChannel(RabbitMQPool.java:200)
    at myapp.rabbitmq.RabbitMQPool.registerWriteOnlyChannel(RabbitMQPool.java:185)
    at myapp.rabbitmq.RabbitMQPool.registerWriteOnlyChannel(RabbitMQPool.java:181)
    at myapp.MyMainClass.start(MyMainClass.java:110)
    at myapp.MyMainClass.main(MyMainClass.java:46)

When, however, i don't wait 2 seconds at each handleDelivery, creating the second queue will be created just fine. Why?

These are the parameters for creating a new connection:

ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setHost("xxx.xxx.xxx.xxx");
factory.setUsername("user");
factory.setPassword("pass");
factory.setAutomaticRecoveryEnabled(true);
factory.setConnectionTimeout(0);
Connection connection = factory.newConnection();

And the code that builds the channels:

if (!connection.isOpen()) {
    // this fires at the second time when first channel consumes with `Thread.sleep(2000)`
}
channel = connection.createChannel();
channel.queueDeclare(channelName, true, false, false, null);
if (consumer != null) { // only the first time
    DefaultConsumer queueingConsumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            Thread.sleep(2000)
        }
        ... some more @Overrides
    };
    channel.basicConsume(channelName, autoAck, queueingConsumer);
}

RabbitMQ Server logs don't show any errors and also no other errors are thrown. RabbitMQ is connected via LAN and i reproduced the error for dozens of times in both configurations with the same outcome.

I upgraded from driver version 3.5.6 to 5.1.2 and RabbitMQ Server (fresh OS, too) from 3.5.6 to 3.7.3 and the problems began. I thought maybe it'd be possible that QueueingConsumer was deprecated and now i have to use DefaultConsumer.


Answer:

I think i found the solution.

Pausing the consumerthread with Thread.sleep(...) doesn't seem to be a good idea.

When i use basicCancel(...) on the channel, everything works well and no channel will be closed and no exception will be raised.

Question:

I am currently running a basic RabbitMQ topic publishing that publishes every 3 seconds.

My class looks like this:

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    @Scheduled(fixedRate = 3000)
    public void publish(String[] argv)
                  throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

I expect that every time when the method publish runs, it will publish and then the channel and connection both close, preventing a new channel and connection from stuck in the memory every 3 seconds.

However, when I look at my RabbitMQ admin interface (at the overview page), the Global Counts section shows that both the total number of connections and channels keep on increasing.

Eventually, my application crashes due to Socket limit and Memory limit being reached.

So it seems like close() doesn't delete the channel and connection used and still keep them in memory that eventually cause all the memory being consumed. What is proper method to use on the channel and connection to make sure that they don't do this?


Answer:

However, when I look at my RabbitMQ admin interface (at the overview page), the Global Counts section shows that both the total number of connections and channels keep on increasing.

I think you have some problem with your connections.

Check your TCP connections in your OS, for example:

1. netstat -anp | grep :5672 | grep ESTABLISHED | wc -l

check also your connections using the command line tool:

2. rabbitmqctl list_connections if you have lot of connections in 1 and 2 means that you are not closing the connections/channels in the right way.

If you need to handle lot of connection you could increase the file descriptors configuration:

for example https://www.rabbitmq.com/install-debian.html

With systemd (Recent Linux Distributions)

On distributions that use systemd, the OS limits are controlled via a configuration file at /etc/systemd/system/rabbitmq-server.service.d/limits.conf, for example:

[Service] LimitNOFILE=300000

EDIT

The code you posted is fine, close() method is the right way to close the connection.

You should investigate in your real code if you execute the code.

Check also inside the log if there is:

=INFO REPORT==== 22-Aug-2017::09:23:28 ===
connection <0.383.0> ([::1]:60590 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'

=INFO REPORT==== 22-Aug-2017::09:23:37 ===
closing AMQP connection <0.383.0> ([::1]:60590 -> [::1]:5672, vhost: '/', user: 'guest')

closing AMQP connection is the close connection

Question:

I have a problem when RabbitMQ attempts to recover connection after network interface fails. I create connection to RMQ and emulate network interface fail (sudo ifdown enp0s3 in Centos).

After requested heartbeat timeout I'm getting UnknownHostException. It's ok because I don't have in /etc/hosts RMQ address.

But when I start network interface (sudo ifup enp0s3) I'm getting this expeptions again and again. Connection doesn't recover after network interface fail.

Java version 1.8.0_60 amqp-client: 3.5.6

Code:

    ConnectionFactory factory = new ConnectionFactory();
    Connection conn = null;
    Channel channel = null;

    factory.setUsername(USERNAME);
    factory.setPassword(PASSWORD);
    factory.setVirtualHost(VIRTUAL_HOST);
    factory.setHost(HOST);
    factory.setPort(PORT);
    factory.setRequestedHeartbeat(4);
    // auto-recovery
    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(5000);

    try {
        conn = factory.newConnection();
        channel = conn.createChannel();
    } catch (Exception e) {
        e.printStackTrace();
        return;
    }

    while (true) {
        try {
            Thread.sleep(5000);
            System.out.println(String.format("Connection is opened: %s", conn.isOpen()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Exception:

Caught an exception during connection recovery!

java.net.UnknownHostException: rmq.dev at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at com.rabbitmq.client.impl.FrameHandlerFactory.create(FrameHandlerFactory.java:32) at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:34) at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:476) at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:444) at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:53) at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.java:383) at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:576) at java.lang.Thread.run(Thread.java:745) Connection is opened: false, channel is opened: false

Can anybody help me? Why Java can't resolve hostname after network fail?


Answer:

A problem with RabbitMQ recovering fixes by periodical resolving InetAddress.getByName(HOST) outside Amqp-client. But I don't understand why this resolving can't work inside InetSocketAddress constructor. Any ideas? Does exist any better solution?

Question:

on my java program, some kind of messages are being sent over RabbitMQ queues as below :

 if(!con.isConnected()){
        log.error("Not connected !!!");

        return false;
}
con.getChannel().basicPublish("",queueName, MessageProperties.PERSISTENT_BASIC, bytes)
  1. I deleted queues via RabbitMQ management GUI plugin
  2. try to send a message over that deleted queue

Result: queues were deleted from RabbitMQ GUI but when I am trying to send message over that deleted RabbitMQ queues, connection is still alive.(con.isConnected() == true ) I need to find a way to listen the queue, if it is deleted , I shouldn't send any message to the deleted queue.

Note: After deleting queue, I am not restarting RabbitMQ.

channel creation :

 channel = connection.createChannel();
 channel.queueDeclare(prop.getQueueName(), true, false, false, null);

example code channel, queue,exchange creation :

ConnectionFactory cf = new ConnectionFactory();
    cf.setUsername("guest");
    cf.setPassword("guest");
    cf.setHost("localhost");
    cf.setPort(5672);
    cf.setAutomaticRecoveryEnabled(true);
    cf.setConnectionTimeout(10000);
    cf.setNetworkRecoveryInterval(10000);
    cf.setTopologyRecoveryEnabled(true);
    cf.setRequestedHeartbeat(5);
    Connection connection = cf.newConnection();

    channel = connection.createChannel();
    channel.queueDeclare("test", true, false, false, null);
    channel.exchangeDeclare("testExchange", "direct",true);
    channel.queueBind("test", "testExchange", "testRoutingKey");

    connection.addShutdownListener(new ShutdownListener() {

        @Override
        public void shutdownCompleted(ShutdownSignalException cause) {
            System.out.println("test"+cause);
        }
    });

Sending message :

            channel.basicPublish("testExchange", "testRoutingKey", null,messageBodyBytes);

Answer:

From RabbitMQ google

Messages in AMQP 0-9-1 are not published to queues; they are published to exchanges, from where they are routed to a queue (or another exchange) or not. [1] basic.publish is a completely asynchronous protocol method by design: there is no response for it unless you ask for it [2]. Messages that are unroutable can be returned to the publisher if you define a return listener and publish with the mandatory flag set to true. Note that publisher confirms and the mandatory flag/returns are orthogonal and one does not imply the other.

Defining return listener and setting mandatory flag true was solved my problem. If any message was not routed , I can catch them by using ReturnListener and add to my persisted queue to send another time when system becomes active.

Question:

I've looked at the tutorial for RabbitMQ RPC. The client and server basicly do the same. Assumed i've 2 or more computer who want to consume from a queue. I wonder how the know from each other if i just pass the own network hostname/ip to the ConnectionFactory. Does this example work on to different machines? (Can not test because of config issues).


Answer:

Yes, it works on different machines. If you want to connect from a different computer just pass a different Host to your ConnectionFactory.

If you can't connect from a different machine due to configuration problems, perhaps you are using the guest:guest user, which can only connect from localhost? https://www.rabbitmq.com/access-control.html

Question:

I have created a simple Console Application listening to messages on Rabbit MQ. Works fine. No problems there. But how do I close the connection. I've been googleing arround a lot, but I didn't find any clear answers. The closest I got to an answer is this SO question: What is the best way to safely end a java application with running RabbitMQ consumers, but the answer omits the most important part: The code!

Here is my code:

package com.company;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

interface ObjectRecievedListener {
    void objectRecieved(Object obj);
}

public class RabbitMQReceiver extends RabbitMQBase
{

    ArrayList<DefaultConsumer> consumers = new ArrayList<>();

    private List<ObjectRecievedListener> listeners = new ArrayList<>();
    private final Connection connection;
    private final Channel channel;

    public RabbitMQReceiver(RabbitMQProperties properties) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(properties.getHost());
        factory.setPassword(properties.getPassword());
        factory.setUsername(properties.getLogin());

        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(properties.getInboundQueueName(), true, false, false, null);
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");


                try {
                    doWork(message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        consumers.add((DefaultConsumer) consumer);

        boolean autoAck = false;
        channel.basicConsume(properties.getInboundQueueName(), autoAck, consumer);
    }

    public void addListener(ObjectRecievedListener listener) {
        listeners.add(listener);
    }

    public void stop() {
        try {
            channel.close();
            connection.close();
        } catch (Exception e) {
        }
    }

    private void doWork(String message) {
        Object obj = getObjectFromXML(message);

        for (ObjectRecievedListener l : listeners)
            l.objectRecieved(obj);
        stop();
    }
}

But it doesn't stop just because I called stop()

So in short: How do I close the connection to Rabbit MQ?


Answer:

The RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.


When you say "it doesn't stop" have you confirmed that the stop() method is actually called? Is your code hanging on one of the close() methods?

The channel instance will have a basicCancel method that you could call to cancel the current consumer before closing the channel and connection. To be honest, closing the channel will cancel your consumer so I doubt this is the root cause of the issue.