Hot questions for Using Transmission Control Protocol in activemq

Question:

I'm using Stomp and ActiveMQ to listener messages from lan and publish it to some application.

For testing, I implemented using tcp protocol connection, I need to use Websocket protocol.

My activeMQ already configure to use WebSocket, see the code below:

<!--
    The transport connectors expose ActiveMQ over a given protocol to
    clients and other brokers. For more information, see:

    http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61623?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

But if I use the ws connection not work for me:

SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd.HH:mm:ss.SSS");

String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "61623"));
String destination = arg(args, 0, "/topic/event");
String protocol = "ws://";


StompJmsConnectionFactory factory = new StompJmsConnectionFactory();
factory.setBrokerURI(protocol + host + ":" + port);

Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = new StompJmsDestination(destination);

MessageConsumer consumer = session.createConsumer(dest);

I looked for some example about the WS connection using the StompJmsConnectionFactory class, but only have with tcp connection.

Someone already implemented something like this?

Thanks


Answer:

I have used ActiveMQ with Stomp and WebSockets to get data from a browser. The configuration that worked for me is quite similar except :

  1. In my code I used String protocol = "tcp://";. It's the message broker that communicates with WebSockets (to a browser ?). Your java application communicates with the message broker through tcp.

  2. I used the Apollo message broker engine with this configuration

    <connector id="tcp" bind="tcp://0.0.0.0:61613" connection_limit="64">
     <detect protocols="openwire stomp" />
    </connector>
    <connector id="ws"  bind="ws://0.0.0.0:61623"  connection_limit="16">
     <detect protocols="stomp" />
    </connector>
    
  3. I called connection.start(); at the end after the MessageConsumer had been created

Question:

I have been facing this below WARN in active.log file very often . Don't understand what exactly every warning means . Can some please help me to understand this and how to fix

1, 2017-06-07 11:11:12,051 | WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: /local/apps/apache-activemq-5.14.3/data only has 5924 mb of usable space. - resetting to maximum available disk space: 5924 mb | org.apache.activemq.broker.BrokerService | main

2, 2017-06-07 11:36:02,358 | WARN | Transport Connection to: tcp://10.235.454.23:59053 failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///10.123.343.345:59053@61616 2017-06-07 11:37:58,441 | WARN | Transport Connection to: tcp://10.123.345.768:46840 failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///10.240.123.234:46840@61616

3, 2017-06-07 11:41:54,376 | WARN | persistent="false", ignoring configured persistenceAdapter: KahaDBPersistenceAdapter[/local/apps/apache-activemq-5.14.3/data/kahadb] | org.apache.activemq.broker.BrokerService | main


Answer:

To resolve this error/warning you should limit the broker's temporary store limit to a value that is available on disk. This value is set in the broker's system usage configuration by the tempUsage limit in apache-activemq-5.9.0-bin\apache-activemq-5.9.0\conf\activemq.xml:

<systemUsage>
   <systemUsage>
      <memoryUsage>
          <memoryUsage limit="64 mb"/>
      </memoryUsage>
      <storeUsage>
         <storeUsage limit="100 gb"/>
      </storeUsage>
      <tempUsage>
         <tempUsage limit="50 gb"/>
      </tempUsage>
   </systemUsage>
</systemUsage>

While the broker may perform ok at first with this condition it has the potential to lead to unexpected results such IOExceptions rather than triggering Producer Flow Control when the resources are exhausted. It is recommend to fix this situation by adjusting the usages limits appropriately.

Root Cause

The broker has requested to allocate more disk resources than what is available. Please check you disk space memory where activemq is located before setting up the tempusage memory .

ActiveMQ provides mechanism to tune memory usage per destination

<policyEntry queue=">" producerFlowControl="false" memoryLimit="256mb">

I have used wild card character in queue name [> is used to recursively match any destination starting from this name] in the above configuration it is for any queues Anyways for each destnation you can use

Per-destination—to set a memory limit on a destination, For example, to limit the amount of memory on the FOO.BAR.TEST queue to 10 MB, define a policy entry like the following:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="FOO.BAR.TEST" memoryLimit="5MB" />
        </policyEntries>
    </policyMap>
</destinationPolicy>

Question:

Am new to topic and subscriber methodology and trying to explore topic subscriber method to post a message over topic and subscribing the posted message in client.

When I studied about this method I read in some article that connection between producer and the subscriber will not terminated until the producer or subscriber goes down. So I tried to check with the connection status using Wireshark to know how much packets are transferring and how often packet transfer is taking place between producer and subscriber I got the following result.

For my knowledge can anyone please explain me what is happening over the network? Is the packets are transferring to keep connection alive? If so then packets are transferring every 10 seconds, is any possible to increase the time from 10 seconds to 30 seconds?

Why am asking for the possibility is for one client to keep connection alive, for every 10 seconds around 190 bytes. If I use multiple clients around 3500 then I think it may cause network issue so.

Am using Java 6 with ActiveMQ to explore about this.

Thanks in advance.


Answer:

You can pla around with the maxInactivityDuration settings on broker and client transports to increase the timeout to a point where the pings are sent at a period you feel comfortable with. The thing to keep in mind is that the longer the inactivity duration the longer it could take for a broker to notice that a client connection had dropped should the socket not close in such a way that a fin packet is sent.

The client and broker won't send keep alive frames to each other should there be other message traffic on the wire, only when the connection is idle.