Hot questions for Using Transmission Control Protocol in akka

Question:

I tried to extend the code about TCP in doc to make a simple chat room, simply put, several clients connected to server, one client send a string, server broadcast the string to all clients...the code I wrote showed below, it doesn't work, can someone tell why?

the program behave wrong, say 2 clients connected to server, one send a message, then the sender get 2 messages back, the other get nothing...

Main.java

public class Main {
  public static void main(String[] args) {
    akka.Main.main(new String[] { Server.class.getName()});
  }
}

Server.java

public class Server extends UntypedActor {

  final ActorRef manager = Tcp.get(getContext().system()).manager();

  public static Props props(ActorRef manager) {
    return Props.create(Server.class, manager);
  }

  @Override
  public void preStart() throws Exception {
    final ActorRef tcp = Tcp.get(getContext().system()).manager();
    tcp.tell(TcpMessage.bind(getSelf(), new InetSocketAddress("0.0.0.0", 8888), 100), getSelf());
  }

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof Bound) {
      manager.tell(msg, getSelf());

    } else if (msg instanceof CommandFailed) {
      getContext().stop(getSelf());

    } else if (msg instanceof Connected) {
      final Connected conn = (Connected) msg;
      manager.tell(conn, getSelf());
      final ActorRef handler = getContext().actorOf(Props.create(SimplisticHandler.class));
      getContext().system().eventStream().subscribe(handler, Notification.class);
      getSender().tell(TcpMessage.register(handler), getSelf());
    }
  }

}

SimplisticHandler.java

public class SimplisticHandler extends UntypedActor {
  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof Received) {

      final ByteString data = ((Received) msg).data();
      System.out.println(data);
      getContext().system().eventStream().publish(new Notification(getSender(), getSelf(), 1, data));

    } else if (msg instanceof ConnectionClosed) {

      getContext().stop(getSelf());

    } else if (msg instanceof Notification) {

      Notification noti = (Notification)msg;
      // TODO while the below statement don't broadcast ?
      if (noti.id == 1) 
        noti.sender.tell(TcpMessage.write((ByteString)(noti.obj)), getSelf());

    }
  }
}

Notification.java

public class Notification {
    public final ActorRef sender;
    public final ActorRef receiver;
    public final int id;
    public final Object obj;

    public Notification(ActorRef sender, ActorRef receiver, int id, Object obj) {
      this.sender = sender;
      this.receiver = receiver;
      this.id = id;
      this.obj = obj;
    }
}

Answer:

You assigned Notification.sender with the tcp actor who send the message.

What you need to do is pass the sender to SimplisticHandler while handling Connected in Server. And every SimplisticHandler always write to their own tcp actor.

Question:

I connect to a TCP service using akka-tcp In very concurrent requests service crashes with this exception:

2018/03/28 13:28:56 +0430 - [INFO] - from application in application-akka.actor.default-dispatcher-30 
CommandFailed(Write(ByteString(123, 34, 116, 121, 112, 101, 34, 58, 34, 110, 101, 119, 79, 114, 100, 101, 114, 34, 44, 34, 109, 101, 115, 115, 97, 103, 101, 73, 100, 34, 58, 34, 49, 99, 52, 54, 55, 50, 49, 97, 45, 102, 101, 51, 97, 45, 52, 100, 50, 48, 45, 57, 100, 98, 102, 45, 102, 98, 50, 56, 101, 97, 56, 52, 100, 48, 55, 56, 34, 44, 34, 97, 99, 99, 111, 117, 110, 116, 78, 117, 109, 98, 101, 114, 34, 58, 34, 49, 54, 56, 56, 48, 49, 49, 53, 55, 50, 53, 32, 32)... and [261] more,NoAck(null))) because of Dropping write because queue is full

I try to increase my server read and write socket buffer according to this article:

https://www.cyberciti.biz/faq/linux-tcp-tuning/

but my problem still exists


Answer:

The problem were because of buffer socket size of server that serves TCP service. After increasing the buffer size of both servers the problem resolved. see this:

https://www.cyberciti.biz/faq/linux-tcp-tuning/