Hot questions for Using Transmission Control Protocol in streaming

Question:

I've coded an app that streams audio over TCP from client to server but it does not work, i.e. no audible output. Could you check my code tell me whats wrong with it?

Client:

public void startStreaming() {


   Thread streamThread = new Thread(new Runnable() {

       @Override
       public void run() {
            try {

                int minBufSize =AudioRecord.getMinBufferSize(sampleRate, channelConfig, audioFormat);

                Log.d("VD ", "Bufer intioalised "+minBufSize);

                short[] buffer=new short[minBufSize];


                Log.d("VS","Buffer created of size .c" + minBufSize);
              //  DatagramPacket packet;

                final InetAddress destination = InetAddress.getByName(target.getText().toString());
                port=Integer.parseInt(target_port.getText().toString());

                Socket socket=new Socket(destination,port);

                DataOutputStream dos=new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
                Log.d("VS", "Address retrieved.c");


                if (minBufSize != AudioRecord.ERROR_BAD_VALUE) {
                    recorder = new AudioRecord(MediaRecorder.AudioSource.MIC,sampleRate,channelConfig,audioFormat,minBufSize);
                Log.d("VS", "Recorder initialized.c");}


                if (recorder.getState() == AudioRecord.STATE_INITIALIZED){
                    Log.d("VS", "Recorder working....c");
                    recorder.startRecording();}

                BufferedWriter input;
                while(status == true) {


                    //reading data from MIC into buffer
                  int  bufferReadResult = recorder.read(buffer, 0, buffer.length);


                      dos.write(buffer,0,bufferReadResult);

                  dos.flush();



                }

            } catch(UnknownHostException e) {
                e.printStackTrace();
            } catch (IOException e) {
                Log.e("IOException message:",e.getMessage().toString());


            }



        }

Server:

public void startStreaming() { Thread streamThread = new Thread(new Runnable() {

    @Override
    public void run() {



        try {

            int  minBufSize =1024;//recorder.getMinBufferSize(sampleRate,channelConfig,audioFormat);


            ServerSocket serversocket = new ServerSocket(50005);



             // DatagramSocket socket = new DatagramSocket(50005);


              byte[] buffer = new byte[1024];


                  if (minBufSize != AudioRecord.ERROR_BAD_VALUE) {



                      speaker = new AudioTrack(AudioManager.STREAM_MUSIC,sampleRate,channelConfig,audioFormat,minBufSize,AudioTrack.MODE_STREAM);


                      speaker.play();
                      Log.d("VR", "spekaer playing...");
                  }
            //  }




                  Log.d("VR", ""+status);
                  BufferedReader input;
                  InputStream is;
                  ObjectInputStream ois;
                  ByteArrayInputStream baiss;
                  socket = serversocket.accept();
                    DataInputStream dis=new DataInputStream(new BufferedInputStream(socket.getInputStream()));


                        while(status == true) {

                              //DatagramPacket packet = new DatagramPacket(buffer,buffer.length);

                            InputStream in = socket.getInputStream();

                            Log.d("content :", socket.getOutputStream().toString());


                            int i=0;
                            while (dis.available() > 0 && i < buffer.length) {
                                buffer[i]=(byte) dis.readShort();
                              i++;
                            }

       speaker.write(buffer,0,buffer.length);

Please give me your best help.


Answer:

The answers are in the comments. Both for server and client read()/write() had to be implemented better.

Question:

I would like a Spring Integration implementation of a stream reader. Another application (outside of java) sends streams of data (delimited by dollar-signs) to port 9999. This server listens.

First I made sure the stream was streaming by connecting to it with telnet 127.0.0.1 9999.

Then I created a simple java application with the following method. This is working currently.

public void readStream() throws IOException{
    Scanner s = null;
    try {
        Socket skt = new Socket("localhost", 9999);
        BufferedReader bufferedReader = new BufferedReader(
                new InputStreamReader(
                        skt.getInputStream()));
        s = new Scanner(bufferedReader);
        s.useDelimiter("[$]");
        System.out.println(s);
        while (s.hasNext()) {
            System.out.println("----------------------");
            System.out.println(s.next());
        }
    } finally {
        if (s != null) {
            s.close();
        }
    }
}

Now, I would like to implement this in Spring Integration framework. I looked at https://github.com/spring-projects/spring-integration-samples/tree/master/basic/tcp-client-server and http://docs.spring.io/autorepo/docs/spring-integration/2.0.0.M3/spring-integration-reference/html/stream.html. However I get confused where to start? What is needed to connect to the sending application? (I'm really new to the Spring Framework.)

The difficulty for me lies in the terminology. Should I create a TCP Inbound gateway? or a receiving channel adapter? or is it outbound because I'm requesting the stream??

EDIT after comments of Gary:

<bean class="org.springframework.integration.ip.tcp.serializer.ByteArraySingleTerminatorSerializer" id="deserializer1">
    <constructor-arg type="byte" value="$"/>
</bean>
<int-ip:tcp-connection-factory id="server" type="server" port="9999"
    deserializer="deserializer1"
/>
<int-ip:tcp-inbound-channel-adapter id="adapter" connection-factory="server" request-channel="channel1"/>
<int:channel id="channel1" />

Answer:

An inbound gateway is used when the server sends a reply to an inbound request. An inbound channel adapter (<int-ip:tcp-inbound-channel-adapter) is for one-way integration only - the client sends data only and does not receive replies.

You would need a server connection factory, configured to use a ByteArraySingleTerminatorSerializer configured with your $ delimiter, in the deserializer property.

Please use the latest documentation not the old version that was in your question.

Question:

Thanks for reading and sorry for my bad English in advance.

I'm working on Webcam Streaming with OpenCV. My final goal is making a Skype-like application. so I'm trying basic 1:1 TCP model first.

About the TCP 1:1 model, After connection, client send its real-time Webcam frames and server receive and display it in on its jpanel.

I did receiving a picture and displaying it on jpanel so far. I'm trying to receive successive frames. At first, the problem was Server side socket seems like waiting until inputs from client are finished, i.e, it never stops because real-time frames are continuously sent. So I sent every frame size before sending the frame to escape from unstoppable waiting. But it doesn't work well. Client keeps sending frames, but server doesn't receive it well. For example, if client send around 25k byte sized frames, the server only receive 1 to 3 bytes per one read even the buffer size is 512.

ClientThread.java

package client;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

import javax.imageio.ImageIO;

import video.VideoCap;

public class ClientThread extends Thread
{

    String   serverIp;
    int      serverPort;
    Socket   socket;
    VideoCap videoCap;

    public ClientThread(Socket socket, String serverIp, int serverPort, VideoCap videoCap)
    {
        this.socket = socket;
        this.serverIp = serverIp;
        this.serverPort = serverPort;
        this.videoCap = videoCap;
    }

    public void run()
    {
        while (ClientUI.calling)
        {
            try
            {
                InputStream in = socket.getInputStream();
                DataInputStream dis = new DataInputStream(in);
                OutputStream out = socket.getOutputStream();
                DataOutputStream dos = new DataOutputStream(out);

                // receive
                int bufSize = dis.readInt();
                while (ClientUI.calling)
                {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    ImageIO.write(videoCap.getOneFrame(), "jpg", baos);
                    InputStream inputImage = new ByteArrayInputStream(baos.toByteArray());

                    // frame size
                    dos.writeInt(baos.size());
                    out(inputImage, baos, bufSize);
                    Thread.sleep(5000);
                }

            }
            catch (IOException | InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }

    void out(InputStream in, OutputStream out, int bufSize)
    {
        long size = 0;
        try
        {
            byte[] buf = new byte[bufSize];
            int n;
            while ((n = in.read(buf)) > 0)
            {
                out.write(buf, 0, n);
                size += n;
                System.out.println("size: " + size);
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }

        finally
        {
            System.out.println(getClass().getName() + " :: out >>> sent size: " + size);
        }
    }
}

ServerThread.java

package server;

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import javax.imageio.ImageIO;
import javax.swing.JPanel;

public class ServerThread extends Thread
{
    ServerSocket serverSocket;
    Socket       socket;
    JPanel       panel;
    byte[]       buf;

    public ServerThread(ServerSocket serverSocket, JPanel panel, int bufSize)
    {
        this.serverSocket = serverSocket;
        this.panel = panel;
        buf = new byte[bufSize];
    }

    public void run()
    {
        try
        {
            System.out.println("waiting for client");
            socket = serverSocket.accept();
            System.out.println("client accepted");
            InputStream in = socket.getInputStream();
            DataInputStream dis = new DataInputStream(in);
            OutputStream out = socket.getOutputStream();
            DataOutputStream dos = new DataOutputStream(out);
            dos.writeInt(buf.length);

            while (ServerUI.calling)
            {
                int frameSize = dis.readInt();
                ByteArrayOutputStream outImage = new ByteArrayOutputStream();
                long size = 0;
                int n;

                while (frameSize >= size)
                {
                    n = dis.read(buf);
                    if (n == -1)
                        break;
                    outImage.write(buf, 0, n);
                    size += n;
                    System.out.println(n);

                }

                InputStream inputImage = new ByteArrayInputStream(outImage.toByteArray());
                BufferedImage bufferedImage = ImageIO.read(inputImage);
                panel.getGraphics().drawImage(bufferedImage, 0, 0, null);
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

}

Answer:

I changed DataOutput/InputStream to ObjectOutput/InputStream. I'm not sure why it did not go well, but I guess this is because of serializing problem. but byte is not necessary to be serialized so I don't know exactly.

I'll provide with anyhow working codes. Because of AudioServer I divided into two Thread so the previous codes and the codes below are pretty different.

VideoServerThread.java

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import javax.imageio.ImageIO;
import javax.swing.JPanel;

public class VideoServerThread extends Thread
{
    private ServerSocket serverSocket;
    int                  videoServerPort;
    private Socket       socket;
    private JPanel       panel;
    private boolean      calling;

    public VideoServerThread(ServerSocket serverSocket, int videoServerPort, JPanel panel, boolean calling)
    {
        this.serverSocket = serverSocket;
        this.videoServerPort = videoServerPort;
        this.panel = panel;
        this.calling = calling;
    }

    @Override
    public void run()
    {
        System.out.println("Video Server opened!");
        try
        {
            serverSocket = new ServerSocket(videoServerPort);
            socket = serverSocket.accept();
            InputStream in = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(in);
            BufferedImage bufferedImage;
            InputStream inputImage;
            Frame f;
            while (calling)
            {
                f = (Frame) ois.readObject();
                inputImage = new ByteArrayInputStream(f.bytes);
                bufferedImage = ImageIO.read(inputImage);
                panel.getGraphics().drawImage(bufferedImage, 0, 0, panel.getWidth(), panel.getHeight(), null);
                panel.getGraphics().drawImage(bufferedImage, 0, 0, null);
                bufferedImage.flush();
                inputImage.close();
                f = null;
            }

        }

        catch (IOException e)
        {
            e.printStackTrace();
        }
        catch (ClassNotFoundException e)
        {
            e.printStackTrace();
        }
    }

    class Frame implements Serializable
    {
        public byte[] bytes;
        public Frame(byte[] bytes)
        { 
            this.bytes = bytes;
        }

        public int size()
        {
            return bytes.length;
        }
    }
}

VideoClientThread.java

import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
import javax.imageio.ImageIO;
import common.Frame;
import video.VideoCap;

public class VideoClientThread extends Thread
{
    private final String formatType = "jpg";
    private VideoCap     videoCap;
    private Socket       socket;
    private String       ip;
    private int          port;
    private boolean      calling;

    public VideoClientThread(VideoCap videoCap, Socket socket, String ip, int port, boolean calling)
    {
        this.videoCap = videoCap;
        this.socket = socket;
        this.ip = ip;
        this.port = port;
        this.calling = calling;
    }

    public void run()
    {
        try
        {
            socket = new Socket(ip, port);
            socket.setSoTimeout(5000);
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            Frame f;
            BufferedImage bufferedImage;
            while (calling)
            {
                ByteArrayOutputStream fbaos = new ByteArrayOutputStream();
                bufferedImage = videoCap.getOneFrame();
                ImageIO.write(bufferedImage, formatType, fbaos);
                f = new Frame(fbaos.toByteArray());
                oos.writeObject(f);
                oos.flush();
                bufferedImage.flush();
                // Thread.sleep(33);
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        // catch (InterruptedException e)
        // {
        // e.printStackTrace();
        // }
    }
}

Question:

I have some raw PCM data for audio in a C# app. I need to get it into a Java application and playing as audio.

As a first step, I want the Java application to play the audio immediately as it is being streamed in (not saving a temporary file and playing that, playing it as it is received).

I am having problems formatting the PCM audio data in C# properly and sending it to the Java application. I am currently just using a simple TCP socket system to do so, calling the C# app (made in Unity) the "client" and the Java app the "server."

On the Unity C# client, I am using NatMic to get the raw PCM data for the Mic. This function is called many times per second as data comes in:

void IAudioProcessor.OnSampleBuffer(float[] sampleBuffer, int sampleRate, int channelCount, long timestamp)

I have a float array and two ints to describe the audio data.

How do I package this data up as bytes to be sent to Java?

I have a TCP set up working where I can send bytes to Java, and it seems to work fine when I test just sending simple string messages back and forth between the client and server.

But when I try to package up the audio and send it over, it never works. I always get an error like this:

javax.sound.sampled.UnsupportedAudioFileException: could not get audio input stream from input stream

Here is a snippet on the Java side of how I was trying to play the audio on the Java side as a test (when the client connects I run this once):

    inputStream = client.getInputStream();
    bInputStream = new BufferedInputStream(inputStream);
    Clip clip = AudioSystem.getClip();
    audioStream = AudioSystem.getAudioInputStream(bInputStream);
    clip.open(audioStream);
    clip.start();

How should I be packaging the float[], int, and int to send to the Java server so it will recognize it as an audio stream?


Answer:

First off, you will need to output the data as a SourceDataLine, not a Clip. The Clip cannot play until it has received the entire sound file.

The usual plan is to read the input line in whatever manner you want and convert it to the proper format in a loop calling the sdl.write() method.

Something along the following lines should work, for incoming data

while(playerRunning)
{
    readBuffer = getInputPCM();
    audioBytes = convertToFollowAudioFormat(readBuffer);
    sourceDataLine.write(audioBytes, 0, sdlBufferSize);
}

If your incoming data is floats, ranging from -1 to 1, and you are outputting in the basic "CD Quality" Java format (44100 fps, 16-bit, stereo, little endian), then the conversion might look like the following:

for (int i = 0, n = buffer.length; i < n; i++)
{
    buffer[i] *= 32767;
    audioBytes[i*2] = (byte) buffer[i];
    audioBytes[i*2 + 1] = (byte)((int)buffer[i] >> 8 );
}
return audioBytes;

Depending on the form of your PCM, you might have to do a bit more or less. For example, if you are reading ints or shorts, ranging from -32767 to 32767, then the multiplication step is not needed.

To get a SourceDataLine outputting "CD Quality" audio format on the default audio line:

AudioFormat audioFormat = new AudioFormat(AudioFormat.Encoding.PCM_SIGNED, 
                44100, 16, 2, 4, 44100, false);
Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
SourceDataLine sdl = (SourceDataLine)AudioSystem.getLine(info);

As far as your getInputPCM method, I'm assuming you are able to present only PCM data in the stream and there is no need for further parsing. If you want to do the task of converting the PCM to bytes in C# prior to shipping, that is also perfectly valid.

There is more written about various output formats on the Java Tutorial's Sampled Package trail.

For the above, I'm mostly copying/editing code from my github project AudioCue, which is a sort of souped-up Clip.

Question:

I'm trying to get make a Verticle than runs a TcpServer which then reads XML asynchronously from incomming connections, stores all xml it receives into a PriorityQueue and then broadcasts XML messages from the Queue.

I'm using a NetServer that reads incoming data from accepted NetSocket connections, but I don't know how to parse the stream data by xml. If I use the standard handler, which converts Buffer to String, I'm getting cut off messages. I've managed to get it mostly working by creating a RecordParser that deliminates by the XML's end tag, but that's more of a band-aid. If I were to get a lot of bad data before the Xml message, that would be included.

When I use straight netty, it has an option for an XMLFrameDecoder which worked, but is there some way to achieve this in Vert.x?

Here's what I have so far

@Override
public void start(Future<Void> startFuture) {
    NetServerOptions options = new NetServerOptions()
            .setPort(mPort)
            .setTcpKeepAlive(true);
    mServer = vertx.createNetServer(options);
    mServer.connectHandler(new Handler<NetSocket>() {
        @Override
        public void handle(NetSocket netSocket) {
            netSocket.handler(RecordParser.newDelimited("</root>", new Handler<Buffer>() {
                @Override
                public void handle(Buffer buffer) {
                    String xml = buffer.getString(0, buffer.length()) + "</root>";
                    XMLMessage message = new XMLMessage (xml);
                    System.out.println(xml);
                }
            }));
        }
    });
    mServer.listen();
}

Let me know if you have any suggestions.


Answer:

There is no RecordParser in Vert.x which would do the same job as Netty's XMLFrameDecoder.

It is possible though to integrate custom Netty servers with the Vert.x programming model.

Question:

I am using this example as my starting point.

In that example, server responds with the modified text but in my case, server doesn't need to respond.

Looking at other similar questions, I am aware that I can either pass Empty ByteString or use filter(p -> false) to not send anything back. However, in that case, the problem is that my whenComplete block doesn't get executed. i.e. the exception gets swallowed. Is there a way to avoid this? Help appreciated on this !

connections.runForeach(connection -> {
System.out.println("New connection from: " + connection.remoteAddress());

final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class)
.via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW))
.map(ByteString::utf8String)
.map(s -> s + "!!!\n")
.map(ByteString::fromString);

connection.handleWith(echo, mat);
}, mat).whenComplete((done,throwable) -> 
    { 
    //exception handling
    }
);

Answer:

Your analysis is correct, now you are reacting when the server shuts down rather than each connection.

Reacting on the individual connections completing would be done in the flow passed to the connection, something like this:

final Tcp tcp = Tcp.get(system);
tcp.bind("127.0.0.1", 6789).runForeach((connection) -> {

  final Flow<ByteString, ByteString, NotUsed> echo = Flow.of(ByteString.class)
    .via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW))
    .map(ByteString::utf8String)
    .map(s -> s + "!!!\n")
    .map(ByteString::fromString)
    .watchTermination((notUsed, completionStage) -> {
      completionStage.whenComplete((done, exception) -> {
        System.out.println("Connection from " + connection.remoteAddress() + " completed");
      });
      return notUsed;
    });

  connection.handleWith(echo, materializer);
}, materializer);

Question:

Is it possible to implement request for stream style client server interactions at the tcp level in netty? The protocol would essentially follow:

t0: tcp handshake

t1: client requests topic X

t2-tn: server sends latest enqueued update on topic X

I've seen a netty websockets example, but is it possible, and are there examples, to do this in netty at the tcp level?

Edit: I'm really not asking if this is the correct way to implement the feature, or if there is another solution involving installation of third party software, even if it would be the "right" thing to do. I am asking if this is possible to do using netty.

I would be very interested to hear from any users of netty's SCTP features, any metrics appreciated.


Answer:

Nat answered my question - adding and removing the incoming channel from the sender's channelgroup in channelActive and channelInactive allows messages of arbitrary structure to be pushed to subscribing clients.