views:

237

answers:

3

I have a Stream object that occasionally gets some data on it, but at unpredictable intervals. Messages that appear on the Stream are well-defined and declare the size of their payload in advance (the size is a 16-bit integer contained in the first two bytes of each message).

I'd like to have a StreamWatcher class which detects when the Stream has some data on it. Once it does, I'd like an event to be raised so that a subscribed StreamProcessor instance can process the new message.

Can this be done with C# events without using Threads directly? It seems like it should be straightforward, but I can't get quite get my head around the right way to design this.

A: 

Yes, this can be done. Use the non-blocking Stream.BeginRead method with an AsyncCallback. The callback is called asynchronously when data becomes available. In the callback, call Stream.EndRead to get the data, and call Stream.BeginRead again to get the next chunk of data. Buffer incoming data in a byte array that is large enough to hold the message. Once the byte array is full (multiple callback calls may be needed), raise the event. Then read the next message size, create a new buffer, repeat, done.

dtb
He said without using threads!!! =)
Nayan
@Nayan: Asynchrony *is* multithreading. This problem is inherently one of multithreading. I suspect what the OP means is that he doesn't want to explicitly create the threads himself.
Will Vousden
@Nayan: I assume the OP is looking for a solution that does not explicitly create a new Thread and uses the blocking Read method, but for a non-blocking, asynchronous solution. You can't have asynchronicity with no threads.
dtb
I understand what you are saying but OP did not mention whether to use `Thread` or *threading*! =) Let him understand and choose.
Nayan
@Nayan: He did say "without using Threads directly", with a capital "T" to boot!
Will Vousden
A: 

The normal approach is to use the .NET asynchronous programming pattern exposed by Stream. Essentially, you start reading asynchronously by calling Stream.BeginRead, passing it a byte[] buffer and a callback method that will be invoked when data have been read from the stream. In the callback method, you call Stream.EndRead, passing it the IAsncResult argument that was given to your callback. The return value of EndRead tells you how many bytes were read into the buffer.

Once you've received the first few bytes in this way, you can then wait for the rest of the message (if you didn't get enough data the first time round) by calling BeginRead again. Once you've got the whole message, you can raise the event.

Will Vousden
+4  A: 

When you say not use threads directly, I assume you still want to use them indirectly via async calls, otherwise this wouldn't be very useful.

All you need to do is wrap the async methods of the Stream and store the result in a buffer. First, let's define the event part of the spec:

public delegate void MessageAvailableEventHandler(object sender,
    MessageAvailableEventArgs e);

public class MessageAvailableEventArgs : EventArgs
{
    public MessageAvailableEventArgs(int messageSize) : base()
    {
        this.MessageSize = messageSize;
    }

    public int MessageSize { get; private set; }
}

Now, read one 16-bit integer from the stream asynchronously and report back when it's ready:

public class StreamWatcher
{
    private readonly Stream stream;

    private byte[] sizeBuffer = new byte[2];

    public StreamWatcher(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException("stream");
        this.stream = stream;
        WatchNext();
    }

    protected void OnMessageAvailable(MessageAvailableEventArgs e)
    {
        var handler = MessageAvailable;
        if (handler != null)
            handler(this, e);
    }

    protected void WatchNext()
    {
        stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
            null);
    }

    private void ReadCallback(IAsyncResult ar)
    {
        int bytesRead = stream.EndRead(ar);
        if (bytesRead != 2)
            throw new InvalidOperationException("Invalid message header.");
        int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
        OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
        WatchNext();
    }

    public event MessageAvailableEventHandler MessageAvailable;
}

I think that's about it. This assumes that whichever class is handling the message also has access to the Stream and is prepared to read it, synchronously or asynchronously, based on the message size in the event. If you want the watcher class to actually read the entire message then you'll have to add some more code to do that.

Aaronaught
+1 for implementation which proves my theory of BeginRead is not right solution. Your implementation cleared that doubt.
Nayan
Glorious! This looks like it will do the trick, and I'll try it out tomorrow.
Damien Wildfire
+1. Nitpicking: AFAIK a Stream.Read is allowed to return less then count bytes, as long as it returns at least one byte (if the end is not reached). So if `(bytesRead != 2)` you shouldn't throw an exception but BeginRead again until two bytes have been read.
dtb
@dtb: That's a good point - it kind of depends on the stream. Some will block until the data is available, others will return early, and others still will throw an exception. To make this work with *any* stream, I think you'd need to choose the behaviour in the constructor.
Aaronaught
Don't you want `OnMessageAvailable` to be raised before `WatchNext` in `ReadCallback`?
John Feminella
@John: I think you're right - that was how I originally had it, then I reversed the order because I imagined it would give better throughput, but I realize now that it creates a potential race condition if the consumer needs to actually read more data from the stream when it's available.
Aaronaught
@Aaronaught: Yes, I think that's the smarter call (though I'd have to think a little bit harder about whether that would actually cause a race condition). Also: as-is, I don't think this will work, because nothing actually starts the watching of the stream. Should that be done in that the constructor?
John Feminella
@John: Yes, that's exactly where it was supposed to be. Clerical error.
Aaronaught
@Aaronnaught: Looks right to me now. +1!
John Feminella