views:

429

answers:

2

Hi,

I created a SocketChannel to a remote server to send and receive messages on Tomcat. To receive messages from a remote computer, I used a thread dedicated to task (only this thread will read from the socket, nothing else).

When some bytes are received at the SocketChannel (I keep polling the SocketChannel on non-blocking mode for new data), I first read 4 bytes to get the length of the next message, then allocate and read x bytes from the SocketChannel, which is then decoded and reconstructed into a message.

Below is my code for the receiving thread:

@Override
public void run() {

    while (true) { //Don't exit thread

        //Attempt to read the size of the incoming message
        ByteBuffer buf = ByteBuffer.allocate(4);

        int bytesread = 0;
        try {
            while (buf.remaining() > 0) {
                bytesread = schannel.read(buf);

                if (bytesread == -1) { //Socket was terminated

                } 

                if (quitthread) break;
            }

        } catch (IOException ex) {

        }

        if (buf.remaining() == 0) {
            //Read the header
            byte[] header = buf.array();
            int msgsize = (0xFF & (int)header[0]) + ((0xFF & (int)header[1]) << 8)
                    + ((0xFF & (int)header[2]) << 16) + ((0xFF & (int)header[3]) << 24);

            //Read the message coming from the pipeline
            buf = ByteBuffer.allocate(msgsize);
            try {
                while (buf.remaining() > 0) {
                    bytesread = schannel.read(buf);

                    if (bytesread == -1) { //Socket was terminated

                    }

                    if (quitthread) break;
                }
            } catch (IOException ex) {

            }

            parent.recvMessage(buf.array());
        }

        if (quitthread) {
            break;
        }
    }

}

The first bytes I received from the SocketChannel is fine, and I successfully decoded the message. However, the next time I read from the SocketChannel, the socket skipped ahead about 100 bytes, which caused the wrong bytes to be read and interpreted as length, causing everything to become corrupted.

What is wrong with the code? No other thread is reading from the SocketChannel.

+1  A: 

Your parenthesis are off, the code is:

(0xFF & ((int)header[1] << 8))

which is always 0 (same with << 16 and << 24), my guess is you meant:

((0xFF & ((int)header[1])) << 8)

This would lead to reading not enough message bytes, also leading to a mismatch in synchronisation (as opposed to reading too many.)

Edit: now you fixed the above, I cannot see anything wrong. Could you tell us the relation between the length of the first message and the exact number of bytes that are eaten?

Based on the code shown, my only guess is that you edited some of the behaviour out of the sample shown which might influence the schannel, is the schannel referenced elsewhere?

If the line:

ByteBuffer buf = ByteBuffer.allocate(4);

would be outside of the while that would result in behaviour you describe, but in your sample code it isn't.

rsp
Looks like it's an issue with decoding the header. It works fine now.
futureelite7
A: 

I presume when you say you're polling the socket in non-blocking mode you mean you're using a the "standard" Selector.select() approach?

When select returns and indicates that there's data available for reading from the socket you should only read the bytes that are available before re-entering the call to select(). If read() returns -1 it indicates that no more bytes are available for immediate reading in the buffer - It does not mean that the socket has been closed. Hence I suspect your of attempting to completely fill the buffer before returning is incorrect. Even if it does work your I/O thread will be constantly spinning whilst data arrives. In particular, it looks like you're simply ignoring a return value of -1.

Consider re-architecting your code to use a finite state machine approach. For example, I've implemented this in the past using a 3-state model: IDLE, READ_MESSAGE_LENGTH and READ_MESSAGE.

Adamski
Since I have only one socket per thread, I don't know if using Selector.Select() is necessary.In any case, -1 means that the end of stream is reached = should notify the parent. I just left out the handling for now.I will try using a plain Socket() and see how it goes.
futureelite7