views:

186

answers:

4

Oh how I wish TCP was packet-based like UDP is! [see comments] But alas, that's not the case, so I'm trying to implement my own packet layer. Here's the chain of events so far (ignoring writing packets)

Oh, and my Packets are very simply structured: two unsigned bytes for length, and then byte[length] data. (I can't imagine if they were any more complex, I'd be up to my ears in if statements!)

  • Server is in an infinite loop, accepting connections and adding them to a list of Connections.
  • PacketGatherer (another thread) uses a Selector to figure out which Connection.SocketChannels are ready for reading.
  • It loops over the results and tells each Connection to read().
  • Each Connection has a partial IncomingPacket and a list of Packets which have been fully read and are waiting to be processed.
  • On read():
    • Tell the partial IncomingPacket to read more data. (IncomingPacket.readData below)
    • If it's done reading (IncomingPacket.complete()), make a Packet from it and stick the Packet into the list waiting to be processed and then replace it with a new IncomingPacket.

There are a couple problems with this. First, only one packet is being read at a time. If the IncomingPacket needs only one more byte, then only one byte is read this pass. This can of course be fixed with a loop but it starts to get sorta complicated and I wonder if there is a better overall way.

Second, the logic in IncomingPacket is a little bit crazy, to be able to read the two bytes for the length and then read the actual data. Here is the code, boiled down for quick & easy reading:

int readBytes;         // number of total bytes read so far
byte length1, length2; // each byte in an unsigned short int (see getLength())

public int getLength() { // will be inaccurate if readBytes < 2
    return (int)(length1 << 8 | length2);
}

public void readData(SocketChannel c) {
    if (readBytes < 2) { // we don't yet know the length of the actual data
        ByteBuffer lengthBuffer = ByteBuffer.allocate(2 - readBytes);
        numBytesRead = c.read(lengthBuffer);

        if(readBytes == 0) {
            if(numBytesRead >= 1)
                length1 = lengthBuffer.get();

            if(numBytesRead == 2)
                length2 = lengthBuffer.get();
        } else if(readBytes == 1) {
            if(numBytesRead == 1)
                length2 = lengthBuffer.get();
        }
        readBytes += numBytesRead;
    }

    if(readBytes >= 2) { // then we know we have the entire length variable
        // lazily-instantiate data buffers based on getLength()
        // read into data buffers, increment readBytes

        // (does not read more than the amount of this packet, so it does not
        // need to handle overflow into the next packet's data)
    }
}

public boolean complete() {
    return (readBytes > 2 && readBytes == getLength()+2);
}

Basically I need feedback on my code and overall process. Please suggest any improvements. Even overhauling my entire system would be okay, if you have suggestions for how better to implement the whole thing. Book recommendations are welcome too; I love books. I just get the feeling that something isn't quite right.


Here's the general solution I came up with thanks to Juliano's answer: (feel free to comment if you have any questions)

public void fillWriteBuffer() {
    while(!writePackets.isEmpty() && writeBuf.remaining() >= writePackets.peek().size()) {
        Packet p = writePackets.poll();
        assert p != null;
        p.writeTo(writeBuf);
    }
}

public void fillReadPackets() {
    do {
        if(readBuf.position() < 1+2) {
            // haven't yet received the length
            break;
        }

        short packetLength = readBuf.getShort(1);

        if(readBuf.limit() >= 1+2 + packetLength) {
            // we have a complete packet!

            readBuf.flip();

            byte packetType = readBuf.get();

            packetLength = readBuf.getShort();

            byte[] packetData = new byte[packetLength];
            readBuf.get(packetData);

            Packet p = new Packet(packetType, packetData);
            readPackets.add(p);
            readBuf.compact();
        } else {
            // not a complete packet
            break;
        }

    } while(true);
}
+1  A: 

Can't you just read whatever number of bytes that are ready to be read, and feed all incoming bytes into a packet parsing state machine? That would mean treating the incoming (TCP) data stream like any other incoming data stream (via serial line, or USB, a pipe, or whatever...)

So you would have some Selector determining from which connection(s) there are incoming bytes to be read, and how many. Then you would (for each connection) read the available bytes, and then feed those bytes into a (connection specific) state machine instance (the reading and feeding could be done from the same class, though). This packet parsing state machine class would then spit out finished packets from time to time, and hand those over to whoever will handle those complete and parsed packets.

For an example packet format like

    2 magic header bytes to mark the start
    2 bytes of payload size (n)
    n bytes of payload data
    2 bytes of checksum

the state machine would have states like (try an enum, Java has those now, I gather)

wait_for_magic_byte_0,
wait_for_magic_byte_1,
wait_for_length_byte_0,
wait_for_length_byte_1,
wait_for_payload_byte (with a payload_offset variable counting),
wait_for_chksum_byte_0,
wait_for_chksum_byte_1

and on each incoming byte you can switch the state accordingly. If the incoming byte does not properly advance the state machine, discard the byte by resetting the state machine to wait_for_magic_byte_0.

ndim
I'm sure I 'can', given the design of such a thing. Might you elaborate on this, possibly give the setup of such a "packet parsing state machine"? I admit that I do find it odd that an IncomingPacket is reading into itself; typically this is where a factory pattern or something comes in, so maybe that's what you are proposing? In any case I would love for this answer to be more elaborate!
Ricket
Thanks for the elaboration. I probably need to read more on "state machines" in the context of programming. My existing code seems to pretty much follow what you've said, in a series of if statements, but I'm not sure how to transform that into a state machine without basically becoming a set of more complicated if statements.
Ricket
Hmm, and I just realized you are proposing parsing the stream one byte at a time. I wonder what the performance hit of that would be; trying not to venture into premature optimization, of course.
Ricket
+6  A: 

Probably this is not the answer you are looking for, but someone would have to say it: You are probably overengineering the solution for a very simple problem.

You do not have packets before they arrive completely, not even IncomingPackets. You have just a stream of bytes without defined meaning. The usual, the simple solution is to keep the incoming data in a buffer (it can be a simple byte[] array, but a proper elastic and circular buffer is recommended if performance is an issue). After each read, you check the contents of the buffer to see if you can extract an entire packet from there. If you can, you construct your Packet, discard the correct number of bytes from the beginning of the buffer and repeat. If or when you cannot extract an entire packet, you keep those incoming bytes there until the next time you read something from the socket successfully.

While you are at it, if you are doing datagram-based communication over a stream channel, I would recommend you to include a magic number at the beginning of each "packet" so that you can test that both ends of the connection are still synchronized. They may get out of sync if for some reason (a bug) one of them reads or writes the wrong number of bytes to/from the stream.

Juliano
D'oh! I think you might be right, this maybe be a better approach. Definitely something to consider. It also gains the benefit of not needing to parse the stream byte-by-byte like ndim is suggesting, which seems overkill for this.
Ricket
Whether parsing byte by byte as the data comes in or examining the buffer content byte by byte... you will still need to take a look at each and every byte.
ndim
@ndim That's not true. My current implementation uses a temporary ByteBuffer of 2 bytes to get the length, and after the length is received, a ByteBuffer is created (backed by a byte[]) of the exact length. Then each time data is available, SocketChannel.read() is called with the ByteBuffer and the integer return value (# chars read) is used to keep track of how many total bytes have been read. This is in contrast to your approach of a state engine that accepts only one byte at a time, so I'd need to loop through the data one byte at a time for no particular reason.
Ricket
@ndim Well, I guess it's partially true, in the sense that at some point yes I will be looking at the data, when higher level code extracts meaningful data from the packets and processes it. But this will happen regardless of the stream-to-packet system I use. And I'd like to stick with my current approach which allows SocketChannel.read() to fill the ByteBuffer, instead of repeatedly calling something like Socket.getInputStream().read() [which gets one byte].
Ricket
I've implemented something similar to what Juliano is suggesting, and it works. The great thing about TCP, of course, is that it's reliable. If you lose the connection, you can throw away the buffer, but if you get a whole "packet", you know it's good.
Marcus Adams
It worked amazing and I feel like my code is very clean now. Thanks a lot for clearing my mind when I was overcomplicating the solution! :) (see my edit at the bottom of my question for the methods I wrote given your advice)
Ricket
You are welcome!
Juliano
A: 

I think you're approaching the issue from a slightly wrong direction. Instead of thinking of packets, think of a data structure. That's what you're sending. Effectively, yes, it's an application layer packet, but just think of it as a data object. Then, at the lowest level, write a routine which will read off the wire, and output data objects. That will give you the abstraction layer I think you're looking for.

Andrew B
OP wants to read *packets* from a *stream*, which can be off by an arbitrary number of bytes. You are completely ignoring that part.
ndim
Actually, I thought what he was doing looks exactly like a standard use of tcp. It's guaranteeing the delivery, but you have to reassemble your data on the far side. Same type of thing I've done many times when reading raw tcp data.
Andrew B
+1  A: 

Ignoring client disconnects and server shutdown for now, here's more or less traditional structure of a socket server:

  • Selector, handles sockets:
    • polls open sockets
    • if it's the server socket, create new Connection object
    • for each active client socket find the Connection, call it with event (read or write)
  • Connection (one per socket), handles I/O on one socket:
    • Communicates to Protocol via two queues, input and output
    • keeps two buffers, one for reading, one for writing, and respective offsets
    • on read event: read all available input bytes, look for message boundaries, put whole messages onto Protocol input queue, call Protocol
    • on write event: write the buffer, or if it's empty, take message form output queue into buffer, start writing it
  • Protocol (one per connection), handles application protocol exchange on one connection:
    • take message from input queue, parse application portion of the message
    • do the server work (here's where the state machine is - some messages are appropriate in one state, while not in the other), generate response message, put it onto output queue

That's it. Everything could be in a single thread. The key here is separation of responsibilities. Hope this helps.

Nikolai N Fetissov