Hot questions for Using RabbitMQ in spring websocket

Top Java Programmings / RabbitMQ / spring websocket

Question:

I am using Durable subscription of RabbitMQ Stomp (documentation here). As per the documentation, when a client reconnects (subscribes) with the same id, he should get all the queued up messages. However, I am not able to get anything back, even though the messages are queued up on the server side. Below is the code that I am using:

RabbitMQ Version : 3.6.0

Client code:

var sock;
var stomp;
var messageCount = 0;
var stompConnect = function() {

sock = new SockJS(options.url);

stomp = Stomp.over(sock);
stomp.connect({}, function(frame) {
debug('Connected: ', frame);
console.log(frame);

var id = stomp.subscribe('<url>' + options.source + "." + options.type + "." + options.id, function(d) {
    console.log(messageCount);
    messageCount = messageCount + 1;
}, {'auto-delete' : false, 'persistent' : true , 'id' : 'unique_id', 'ack' : 'client'});
}, function(err) {
console.log(err);
debug('error', err, err.stack);
setTimeout(stompConnect, 10);
});
};

Server Code:

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void configureMessageBroker(final MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("<endpoint>", "<endpoint>").setRelayHost(host)
        .setSystemLogin(username).setSystemPasscode(password).setClientLogin(username)
                .setClientPasscode(password);
    }

    @Override
    public void registerStompEndpoints(final StompEndpointRegistry registry) {
        registry.addEndpoint("<endpoint>").setAllowedOrigins("*").withSockJS();
    }
}

Steps I am executing:

  • Run the script at client side, it sends subscribe request.
  • A queue gets created on server side (with name stomp-subscription-*), all the messages are pushed in the queue and client is able to stream those.
  • Kill the script, this results in disconnection. Server logs show that client is disconnected and messages start getting queued up.
  • Run the script again with the same id. It somehow manages to connect to server, however, no message is returned from the server. Message count on that queue remains the same (also, RabbitMQ Admin console doesn't show any consumer for that queue).
  • After 10 seconds, the connection gets dropped and following gets printed on the client logs:

Whoops! Lost connection to < url >

  • Server also shows the same messages (i.e. client disconnected). As shown in the client code, it tries to establish the connection after 10 seconds and then, same cycle gets repeated again.

I have tried the following things:

  1. Removed 'ack' : 'client' header. This results in all the messages getting drained out of queue, however, none reaches to client. I added this header after going through this SO answer.
  2. Added d.ack(); in the function, before incrementing messageCount. This results in error at server side as it tries to ack the message after session is closed (due to disconnection).

Also, in some cases, when I reconnect with number of queued up messages is less than 100, I am able to get all the messages. However, once it crosses 100, nothing happens(not sure whether this has anything to do with the problem).

I don't know whether the problem exists at server or client end. Any inputs?


Answer:

Finally, I was able to find (and fix) the issue. We are using nginx as proxy and it had proxy_buffering set to on (default value), have a look at the documentation here.

This is what it says:

When buffering is enabled, nginx receives a response from the proxied server as soon as possible, saving it into the buffers set by the proxy_buffer_size and proxy_buffers directives.

Due to this, the messages were getting buffered (delayed), causing disconnection. We tried bypassing nginx and it worked fine, we then disabled proxy buffering and it seems to be working fine now, even with nginx proxy.

Question:

I'm sending a message to a channel but have to modify it for each client.

Anyone with experience on how to do that?


Answer:

There you go.

Spring offers interceptors for incoming and outgoing channels. So simply add an interceptor and you're ready to catch every in- and outgoing message and do whatever you want to.

First your config:

...

@Autowired
private InboundMessagesChannelInterceptor inboundMessagesChannelInterceptor;

@Autowired
private OutboundMessagesChannelInterceptor outboundMessagesChannelInterceptor;

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
    registration.interceptors(inboundMessagesChannelInterceptor);
}

@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
    registration.interceptors(outboundMessagesChannelInterceptor);
}

...

and your interceptor:

@Component
public class OutboundMessagesChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        // modify your message as needed
        return message;
    }

}

Thats it.