views:

2509

answers:

5

When reading data in chunks of say, 1024, how do I continue to read from a socket that receives a message bigger than 1024 bytes until there is no data left? Should I just use BeginReceive to read a packet's length prefix only, and then once that is retrieved, use Receive() (in the async thread) to read the rest of the packet? Or is there another way?

edit:

I thought Jon Skeet's link had the solution, but there is a bit of a speedbump with that code. The code I used is:

public class StateObject
{
    public Socket workSocket = null;
    public const int BUFFER_SIZE = 1024;
    public byte[] buffer = new byte[BUFFER_SIZE];
    public StringBuilder sb = new StringBuilder();
}

public static void Read_Callback(IAsyncResult ar)
{
    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);

    if (read > 0) 
    {
     so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

     if (read == StateObject.BUFFER_SIZE)
     {
      s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
        new AyncCallback(Async_Send_Receive.Read_Callback), so);
      return;
     }
    }

    if (so.sb.Length > 0)
    {
     //All of the data has been read, so displays it to the console
     string strContent;
     strContent = so.sb.ToString();
     Console.WriteLine(String.Format("Read {0} byte from socket" + 
     "data = {1} ", strContent.Length, strContent));
    }
    s.Close();
}

Now this corrected works fine most of the time, but it fails when the packet's size is a multiple of the buffer. The reason for this is if the buffer gets filled on a read it is assumed there is more data; but the same problem happens as before. A 2 byte buffer, for exmaple, gets filled twice on a 4 byte packet, and assumes there is more data. It then blocks because there is nothing left to read. The problem is that the receive function doesn't know when the end of the packet is.


This got me thinking to two possible solutions: I could either have an end-of-packet delimiter or I could read the packet header to find the length and then receive exactly that amount (as I originally suggested).

There's problems with each of these, though. I don't like the idea of using a delimiter, as a user could somehow work that into a packet in an input string from the app and screw it up. It also just seems kinda sloppy to me.

The length header sounds ok, but I'm planning on using protocol buffers - I don't know the format of the data. Is there a length header? How many bytes is it? Would this be something I implement myself? Etc..

What should I do?

+6  A: 

No - call BeginReceive again from the callback handler, until EndReceive returns 0. Basically, you should keep on receiving asynchronously, assuming you want the fullest benefit of asynchronous IO.

If you look at the MSDN page for Socket.BeginReceive you'll see an example of this. (Admittedly it's not as easy to follow as it might be.)

Jon Skeet
+2  A: 

You would read the length prefix first. Once you have that, you would just keep reading the bytes in blocks (and you can do this async, as you surmised) until you have exhausted the number of bytes you know are coming in off the wire.

Note that at some point, when reading the last block you won't want to read the full 1024 bytes, depending on what the length-prefix says the total is, and how many bytes you have read.

casperOne
Are you really sure about the last sentence? Look at this example - http://msdn.microsoft.com/en-us/library/dxkwh6zw.aspx there is no protection for overreading. The reads just get joined together when the read returns 0 bytes. Buffer size is a const=1024.
ryeguy
@ryeguy: I didn't word that right at all. Edited to be more clear.
casperOne
A: 

For info (general Begin/End usage), you might want to see this blog post; this approach is working OK for me, and saving much pain...

Marc Gravell
+3  A: 

Dang. I'm hesitant to even reply to this given the dignitaries that have already weighed in, but here goes. Be gentle, O Great Ones!

Without having the benefit of reading Marc's blog (it's blocked here due the corporate internet policy), I'm going to offer "another way."

The trick, in my mind, is to separate the receipt of the data from the processing of that data.

I use a StateObject class defined like this. It differs from the MSDN StateObject implementation in that it does not include the StringBuilder object, the BUFFER_SIZE constant is private, and it includes a constructor for convenience.

public class StateObject
{
    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    {
        WorkSocket = workSocket;
    }
}

I also have a Packet class that is simply a wrapper around a buffer and a timestamp.

public class Packet
{
    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    {
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    }
}

My ReceiveCallback() function looks like this.

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List<Packet> PacketList = new List<Packet>();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)
{
    try {
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) {
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) {
                PacketList.Add(packet);
            }
            PacketReceived.Set();
        }

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
    } catch (ObjectDisposedException) {
        // Handle the socket being closed with an async receive pending
    } catch (Exception e) {
        // Handle all other exceptions
    }
}

Notice that this implementation does absolutely no processing of the received data, nor does it have any expections as to how many bytes are supposed to have been received. It simply receives whatever data happens to be on the socket (up to 65535 bytes) and stores that data in the packet list, and then it immediately queues up another asynchronous receive.

Since processing no longer occurs in the thread that handles each asynchronous receive, the data will obviously be processed by a different thread, which is why the Add() operation is synchronized via the lock statement. In addition, the processing thread (whether it is the main thread or some other dedicated thread) needs to know when there is data to process. To do this, I usually use a ManualResetEvent, which is what I've shown above.

Here is how the processing works.

static void Main(string[] args)
{
    Thread t = new Thread(
        delegate() {
            List<Packet> packets;
            while (true) {
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) {
                    packets = PacketList;
                    PacketList = new List<Packet>();
                }

                foreach (Packet packet in packets) {
                    // Process the packet
                }
            }
        }
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();
}

That's the basic infrastructure I use for all of my socket communication. It provides a nice separation between the receipt of the data and the processing of that data.

As to the other question you had, it is important to remember with this approach that each Packet instance does not necessarily represent a complete message within the context of your application. A Packet instance might contain a partial message, a single message, or multiple messages, and your messages might span multiple Packet instances. I've addressed how to know when you've received a full message in the related question you posted here.

Matt Davis
You are on the right track, but you also have to consider that your producer puts an *entire* buffer into the list for each receive completed. You have to consider two possible scenarios that will ruin you: 1) the slow WAN network trickle that will fill 10 bytes in each buffer and you'll exhaust your memory *before the first frame is complete*. and 2) the fast sender that overwhelms a slow receiver because the receiver accepts everything before its processed. For 1) you need to coalesce low filled buffers. For 2) you need TCP flow control, stop posting buffers when PacketList grows too big.
Remus Rusanu
Missed that your packet copies out only the filled buffer, so my objection 1) no longer applies (although now I would object about the copy...), but that still leaves 2)
Remus Rusanu
A: 

Hi I ve used above logic in our TCP connection class. In production the packet processing thread gets waitsleepjoin on PacketReceived.WaitOne(); and never recovers from that even when signaled mutilple times from socket receivecallback. we are using background thread to process packets. The exact code is working perfectly in most of our client machines. But in some machines after 30 40 minites processing this scenario is happening. I ve interupted that thred and its getting thread interupted exception.

rajesh mr