Hot questions for Using Transmission Control Protocol in socketchannel

Question:

We use java NIO on client side to communicate with a server. What happens in case of TCP retransmission:

will we hang on write operation till we get ACK or will we return immediately?

(I understant that IO is async and we are getting responses asynchronously, but what about ACKs)


Answer:

It does not wait for the ACK.

A TCP write is complete from the API's point of view when the data has been completely transferred into the sender's socket send buffer. What happens after that is completely asynchronous to, and undetectable by, the sending application, other than via close() with a positive LINGER timeout in blocking mode.

Question:

I have a functioning client-server apparatus which can successfully connect and send messages to each other using NIO.

Right now my only confusion is how I'm supposed to continue reading when socketChannel.read() returns zero.

I have a protocol that sends the first 4 bytes as the number of incoming bytes to expect. Even with that amount, I'm running into a potential issue.

For testing, However, there are times where I might read something like:

5 // Read 5 bytes when calling socketChannel.read()
0 // Read 0 bytes when calling socketChannel.read() immediately after

When I hit the zero, I assumed that I was done reading and need to wait for more data to come to me.

When I do this however, OP_READ doesn't seem to be triggered when I perform selectNow() again later on. I checked the key and it has it's readyops() and interestops() set to 1 (which is OP_READ), but it does not want to recognize that it's time to read again.

I found that if I continue looping to read, I might get something like:

5 // socketChannel.read()
0 // socketChannel.read()
7 // socketChannel.read() (Done since I have all my bytes)
0
0
0
...

I'm confused here because this means one of:

  • There is no data there, so the 0 available is legitimate, but then when the rest of the data comes in, the selector refuses to return the key with selectNow()

  • The data is all there, but for some reason returns 0 on reading.

Am I supposed to re-register the channel after a selectNow() returns it as an active key? (Though I didn't have to between switching from OP_CONNECT to OP_READ... so I'm guessing not). I feel like blindly circling in a loop is dangerous and will waste processing cycles.

Am I just supposed to keep polling them? That makes me confused at when OP_READ actually fires then.


Answer:

This was due to an error on my part, where I did not call .clear() on the bytebuffer that reads. This causes it to return 0 read even though the data has streamed in.

This example may also be of use to people who want to see how a simple client works (though with really bad exception handling). There is no guarantee this will work properly and may likely have issues since it was designed to be a quick and dirty test.

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Test {

    public static final int PORT = 22222;

    public static void main(String[] args) throws IOException {
        Thread s = new Thread(new Server());
        Thread c = new Thread(new Client());
        s.start();
        c.start();
    }
}

class Client implements Runnable {

    public Selector selector;

    public SocketChannel sc;

    public Client() throws IOException {
        selector = Selector.open();
    }

    @Override
    public void run() {
        try {
            sc = SocketChannel.open();
            sc.socket().setTcpNoDelay(true);
            sc.configureBlocking(false);
            SelectionKey k = sc.register(selector, SelectionKey.OP_CONNECT);
            boolean firstConnect = sc.connect(new InetSocketAddress("localhost", Test.PORT));
            if (firstConnect) {
                System.out.println("Connected on first connect, de-registering OP_CONNECT");
                k.interestOps(SelectionKey.OP_READ);
            }

            while (true) {
                int keys = selector.selectNow();
                if (keys > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isConnectable()) {
                            boolean finishConnectResult = sc.finishConnect();
                            key.interestOps(SelectionKey.OP_READ);
                            System.out.println("Finished connection: " + finishConnectResult);
                        }

                        if (key.isReadable()) {
                            ByteBuffer bb = ByteBuffer.allocate(2);
                            int bytesRead = 0;
                            while ((bytesRead = sc.read(bb)) > 0) {
                                bb.flip();
                                System.out.println(bytesRead + " bytes read");
                                System.out.println(bb.get() + ", " + bb.get());
                                //bb.clear(); // If this is not commented, it will not be handled properly.
                            }
                            System.out.println("Last bytes read value = " + bytesRead);
                            System.exit(0);
                        }
                    }
                }

                Thread.sleep(5);
            }
        } catch (Exception e) { 
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
}

class Server implements Runnable {

    public Selector selector;

    public SocketChannel sc;

    public Server() throws IOException {
        selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(Test.PORT));
        ssc.register(selector, SelectionKey.OP_ACCEPT);
    }

    @Override
    public void run() {
        boolean notSentData = true;
        try {
            while (true) {
                int keys = selector.selectNow();
                if (keys > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isAcceptable()) {
                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                            sc = ssc.accept();
                            if (sc != null) {
                                sc.configureBlocking(false);
                                sc.socket().setTcpNoDelay(true); // Required in my application
                                sc.register(selector, SelectionKey.OP_WRITE);
                                System.out.println("Server accepted connection");
                            } else {
                                System.out.println("Got null connection");
                            }
                        }
                    }
                }

                if (sc != null && notSentData) {
                    ByteBuffer bb = ByteBuffer.allocate(4);
                    bb.put(new byte[]{ 1, 2, 3, -1});
                    bb.flip();
                    int wrote = sc.write(bb);
                    System.out.println("Wrote " + wrote + " bytes");
                    notSentData = false;
                }

                Thread.sleep(5);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
}

Question:

I am using one SocketChannel in 2 threads, one thread for sending the data and another for receiving the data.

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(ip,port));
socketChannel.configureBlocking(false);

Thread 1: uses the above socketchannel to write the data

Thread 2: uses the same socketchannel to read the data

I am not using any selectors with the socketchannel as I need the write and read to be asynchronous (using 2 different threads)

PROBLEM: When the connection is lost, the socketchannel.write() and socketchannel.read() operation does not throw any error. It just blocks the operation.

I need to detect the connection loss.

I tried using the heartbeat method in Thread 2 but because the read operation just blocks, this method did not work. Is there any other way to detect the connection loss without using the heartbeat in a new Thread?

Is it possible to throw error while writing/reading if there is connection loss?

Thanks in advance.

EDIT:

Thread 1:

public void run() {
  socketChannel = SendAndReceivePacketUtil.createConnection(ip, port);
  socketChannel.configureBlocking(false);

  RecTask task = new RecTask(socketChannel);
  Thread recThread = new Thread(task);
  recThread.start();

  while(true)
  {
     byte[] data= getDataFromQueue(ip);
     if(data!= null) {
         //print(new String(data));
         sendPacket(data, socketChannel);
     }
   }
}

Thread 2: (RecTask)

public void run() {
  while(true) {
    byte[] data = receivePacket(socketChannel);
    //print(new String(data));
  }
}

Both Thread 1 & 2 have try-catch-finally blocks. finally closes the socketchannel.

sendPacket:

int dataSent = 0;
while (dataSent < data.length) {
    long n = socketChannel.write(buf);
        if (n < 0) {
            throw new Exception();
        }
        dataSent += (int) n;
 }

receivePacket:

int dataRec = 0;
byte[] data = new byte[length];
ByteBuffer buffer = ByteBuffer.wrap(data);

while (dataRec < length) {
    long n = socketChannel.read(buffer);
    if (n < 0) {
        throw new Exception();
    }
    dataRec += (int) n;
}       
return data;

I send and receive data continuously. But as soon as the connection is lost, nothing prints and the code just gets stuck. Its an android wifi direct application. For connection loss scenario I just switch off the wifi module.


Answer:

I am not using any selectors with the socketchannel as I need the write and read to be asynchronous (using 2 different threads)

That's not a reason to avoid a Selector. In fact it's rather difficult to write correct non-blocking NIO code without a Selector.

PROBLEM: When the connection is lost, the socketchannel.write() and socketchannel.read() operation does not throw any error. It just blocks the operation.

No it doesn't. You're in non-blocking mode. It either returns a postive integer, or zero, or throws an exception. Which is it?

I tried using the heartbeat method in Thread 2 but because the read operation just blocks, this method did not work.

The read operation does not block in non-blocking mode.

Is there any other way to detect the connection loss without using the heartbeat in a new Thread?

The only reliable way to detect connection loss in TCP is to write to the connection. Eventually this will throw IOException: connection reset. But it won't happen the first time after the connection loss, due to buffering, retries, etc.

Is it possible to throw error while writing/reading if there is connection loss?

That's what happens.

There is something seriously wrong with this question. Either the code you posted isn't the real code or it isn't behaving as you described. You need to post more of it, e.g. your read and write code.

Question:

The client connects to the server and then enters the following loop. It takes one line of input from the console, sends it to the server. If the input line is a single ‘x’ character, the client exits.

The server

At first, it prints out its own socket address to the screen. When it receives a connection, it prints the peer’s socket address to the screen (so you can check who connected to the server), then enters the following loop. It receives a line of message from the client, prints the received message to the screen.

Non-blocking mode

The client should use buffers and a non-blocking socket channel.

Blocking mode

The server should use buffers and a blocking socket channel.

The issue

The issue that I am having is that the buffer, in the server side, is not flushing and is printing the old input with the new input. Example:

Message received: hello

Message received: helloworld

Message received: helloworldthis

Message received: helloworldthisis

Message received: helloworldthisis Edwin

And it should be printing: hello

world

this

is

Edwin.

This is the server soure code:

public class EchoServer_1 
{
    //  private static CharBuffer buffer = CharBuffer.allocate(1024);
    private static ByteBuffer buffer = ByteBuffer.allocate(1024);
    private static StringBuffer reqString = new StringBuffer();
    public static void main(String[] args) throws IOException
    {

        ServerSocketChannel serverSocketChannel = null;
        int serverPort = 10007;

        try{
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress (serverPort));
            //Blocking mode
            serverSocketChannel.configureBlocking(true);

            System.out.println("Port number: " + serverPort);
            System.out.println("Waiting for connection ..");

            SocketChannel sc = serverSocketChannel.accept();

            System.out.println ("Incoming connection from: " 
                    + sc.socket().getRemoteSocketAddress( ));

            // Read message from client
            while (true)
            {   
                buffer.clear();
                sc.read(buffer);
                buffer.flip();
                while(buffer.hasRemaining()) {
                    char c = (char) buffer.get();
                    if (c == '\r' || c == '\n') break;
                    reqString.append(c);             
                }

                log("Message received: " + reqString);

            }
        }catch (IOException ex)
        {
            if (serverSocketChannel != null)
                serverSocketChannel.close();
            System.err.println("Client has disconneted");
        }
    }
    private static void log(String str) {
        System.out.println(str);
    }
}

This is the client source code:

public class EchoClient_1 
{

    private static Scanner stdIn = new Scanner(System.in);
    //  private static CharBuffer buffer = CharBuffer.allocate(1024);
    private static ByteBuffer buffer = ByteBuffer.allocate(256);

    public static void main(String[] args) throws IOException
    {
        String hostname = new String ("127.0.0.1");
        int port = 10007;

        try{
            InetSocketAddress address = new InetSocketAddress(hostname, port);
            SocketChannel sC = SocketChannel.open(address);
            sC.configureBlocking(false);

            log("Local address: " + hostname + " connecting to Server on port " + port
                    + "...");
            String userInput;

            System.out.println("Enter lines of characters: " );

            while ((userInput = stdIn.nextLine()) != null)
            {
                //Checks if user wants to exit
                if (userInput.equalsIgnoreCase("x")) 
                {

                    buffer.put (userInput.getBytes());
                    buffer.flip();
                    sC.write(buffer);
                    buffer.clear();
                    System.out.println("Closing ..");
                    sC.close();
                    break;
                }

                buffer.put (userInput.getBytes());
                buffer.flip();  
                sC.write(buffer);
                buffer.clear();                             
            }

            sC.close();

        }catch (IOException ex)
        {
        }
    }

    private static void log(String str) {
        System.out.println(str);
    }
}

Answer:

You are never clearing reqString. You should do that after you log it.

Question:

I'm still learning NIO and came up with a simple Ping/Pong application. My client is written in NIO. The server is the telnet-endpoint of Memcached. The client is constantly issuing stats\r\n and the server responds with some statistics.

This setup is working in a non-blocking way with SocketChannel and Selector.select()

The part that I do not fully understand is the error case where the connection between client and server get in limbo state or the server went away without closing the connection (During testing I simulate that with a iptables DROP rule):

I end up (on OSX and Linux) with the following stacktrace on the read operation after a successful select() operation:

java.io.IOException: Operation timed out
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_73]
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_73]
  at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_73]
  at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_73]
  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_73]

The code in use looks something like this:

    Selector selector = Selector.open();
    SocketChannel memcachedClient = SocketChannel.open(MEMCACHED_HOST);
    memcachedClient.configureBlocking(false);

    int ops = memcachedClient.validOps();
    memcachedClient.register(selector, ops, null);

    while (true) {
        LOGGER.info("Masterloop - Start");
        //selector.select();
        selector.select(1 * 1000);
        LOGGER.debug("Masterloop - After select");

        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();
        LOGGER.debug("Found Selectors {}", selectedKeys.size());

        while (selectedKeysIterator.hasNext()) {
            SelectionKey key = selectedKeysIterator.next();

            LOGGER.info("Key: w={} r={} v={} - {}",
                    new Object[] { key.isWritable(), key.isReadable(), key.isValid(), key.toString() });

            if (key.isReadable()) {
                read(key);
            } else if (key.isWritable()) {
                write(key);
            }

            selectedKeysIterator.remove();
        }
    }

What is strange to me are the two observed behaviors depending on the usage of selector.select() and selector.select(timeout):

Selector.select() blocks and fails

Without a timeout, the Selector.select() call blocks for ~50 seconds after I let all packages DROP (or unplug the network cable) and comes back with a single key:

2016-07-12 11:51:55 INFO  App:88 - Masterloop - Start
2016-07-12 11:52:49 DEBUG App:92 - Masterloop - After select
2016-07-12 11:52:49 DEBUG App:96 - Found Selectors 1
2016-07-12 11:52:49 INFO  App:101 - Key: w=false r=true v=true - sun.nio.ch.SelectionKeyImpl@2e817b38

and I end up with the stacktrace above in my read() method:

private static void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    ByteBuffer readBuffer = ByteBuffer.allocate(8192);

    int numRead;
    try {
        numRead = socketChannel.read(readBuffer); // Exception is thrown here
    } catch (IOException e) {
        LOGGER.warn(e.getMessage(), e);
        key.cancel();
        socketChannel.close();
        return;
    }
// do something with readBuffer
key.interestOps(SelectionKey.OP_WRITE);
}
Selector.select(timeout) returns but throws an IOException later, too

With an timeout (i.e. 1000 ms), the Selector.select(1000) call returns after I let all packages DROP (or unplug the network cable) and with zero keys. That happens constantly as my while-loop continues to run. Again after ~45 seconds the then-active loop iteration returns a single, readable key that ends again with an IOException on read()

2016-07-12 11:54:24 INFO  App:113 - ================
2016-07-12 11:54:24 INFO  App:88 - Masterloop - Start
2016-07-12 11:54:25 DEBUG App:92 - Masterloop - After select
2016-07-12 11:54:25 DEBUG App:96 - Found Selectors 0
2016-07-12 11:54:25 INFO  App:113 - ================
2016-07-12 11:54:25 INFO  App:88 - Masterloop - Start
2016-07-12 11:54:26 DEBUG App:92 - Masterloop - After select
2016-07-12 11:54:26 DEBUG App:96 - Found Selectors 0
2016-07-12 11:54:26 INFO  App:113 - ================
...
2016-07-12 11:55:16 INFO  App:113 - ================
2016-07-12 11:55:16 INFO  App:88 - Masterloop - Start
2016-07-12 11:55:17 DEBUG App:92 - Masterloop - After select
2016-07-12 11:55:17 DEBUG App:96 - Found Selectors 0
2016-07-12 11:55:17 INFO  App:113 - ================
2016-07-12 11:55:17 INFO  App:88 - Masterloop - Start
2016-07-12 11:55:18 DEBUG App:92 - Masterloop - After select
2016-07-12 11:55:18 DEBUG App:96 - Found Selectors 1
2016-07-12 11:55:18 INFO  App:101 - Key: w=false r=true v=true - sun.nio.ch.SelectionKeyImpl@2e817b38
2016-07-12 11:55:18 WARN  App:30 - Operation timed out
java.io.IOException: Operation timed out
  at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_73]
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_73]
  at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_73]
  at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_73]
  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_73]
Question

I somewhat understand the behavior in general: It is impossible to judge if the server went away or the connection is still active without actively probing it.

Looking at Wireshark I see a bunch of resubmission (which makes sense as the server never ACKed my client's request) and after the 8th resubmission the IOException happens. But how can I influence this timeout? Is that only on the TCP level? If so which settings (kernel?) are these? I played with some settings but they seem to work only together with KEEP_ALIVE. Can't I get a feedback from NIO earlier (say 10 seconds) that something went wrong?

Thanks!


Answer:

  1. The operation that has timed out is a prior send. This is delivered to the read() immediately, not after a timeout.
  2. You aren't distinguishing between select success (returns > 0) and select timeout (returns zero).
  3. When you get (1), the select will have returned > 0 with the affected channel marked as readable: however what is there to be read is not data but the error. (It can also be end of stream, if the peer has disconnected.)

Question:

I am working on a client for an android device running on android version 4.4.2 and I have found some strange behavior from it. Most likely it is because it is waiting for more data before building a package, but I do not want that so i am trying to set the option TCP_NODELAY. However, there is no such function or variable anywhere to be found.

How do I set the SocketChannel to be in no delay mode? This is my current code for creating the socket channel.

sockChannel = SocketChannel.open(new InetSocketAddress(socket_ip_, socket_port_));

I have tried sockChannel.socket().setTCPNoDelay(true);, but I have noticed no change. Am I on the right track? Am I missing something?

Thank you for your help.


Answer:

If anyone finds this, I had the correct solution.

sockChannel.socket().setTCPNoDelay(true); worked. The reason I didn't see any changes to the behaviour was because someone had changed the name of the app and had forgotten to document that he changed it. Thus I was starting the old program every time.

To access the socket options and functionality you can request the socket by writing .socket().