views:

543

answers:

2

Note: Let me appologize for the length of this question, i had to put a lot of information into it. I hope that doesn't cause too many people to simply skim it and make assumptions. Please read in its entirety. Thanks.

I have a stream of data coming in over a socket. This data is line oriented.

I am using the APM (Async Programming Method) of .NET (BeginRead, etc..). This precludes using stream based I/O because Async I/O is buffer based. It is possible to repackage the data and send it to a stream, such as a Memory stream, but there are issues there as well.

The problem is that my input stream (which i have no control over) doesn't give me any information on how long the stream is. It simply is a stream of newline lines looking like this:

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

So, using APM, and since i don't know how long any given data set will be, it is likely that blocks of data will cross buffer boundaries requiring multiple reads, but those multiple reads will also span multiple blocks of data.

Example:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

My first thought was to use a StringBuilder and simply append the buffer lines to the SB. This works to some extent, but i found it difficult to extract blocks of data. I tried using a StringReader to read newlined data but there was no way to know whether you were getting a complete line or not, as StringReader returns a partial line at the end of the last block added, followed by returning null aftewards. There isn't a way to know if what was returned was a full newlined line of data.

Example:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

What's worse, is that if I just keep appending to the data, the buffers get bigger and bigger, and since this could run for weeks or months at a time that's not a good solution.

My next thought was to remove blocks of data from the SB as I read them. This required writing my own ReadLine function, but then I'm stuck locking the data during reads and writes. Also, the larger blocks of data (which can consist of hundreds of reads and megabytes of data) require scanning the entire buffer looking for newlines. It's not efficient and pretty ugly.

I'm looking for something that has the simplicity of a StreamReader/Writer with the convenience of async I/O.

My next thought was to use a MemoryStream, and write the blocks of data to a memory stream then attach a StreamReader to the stream and use ReadLine, but again I have issues with knowing if a the last read in the buffer is a complete line or not, plus it's even harder to remove the "stale" data from the stream.

I also thought about using a thread with synchronous reads. This has the advantage that using a StreamReader, it will always return a full line from a ReadLine(), except in broken connection situations. However this has issues with canceling the connection, and certain kinds of network problems can result in hung blocking sockets for an extended period of time. I'm using async IO because i don't want to tie up a thread for the life of the program blocking on data receive.

The connection is long lasting. And data will continue to flow over time. During the intial connection, there is a large flow of data, and once that flow is done the socket remains open waiting for real-time updates. I don't know precisely when the initial flow has "finished", since the only way to know is that no more data is sent right away. This means i can't wait for the initial data load to finish before processing, I'm pretty much stuck processing "in real time" as it comes in.

So, can anyone suggest a good method to handle this situation in a way that isn't overly complicated? I really want this to be as simple and elegant as possible, but I keep coming up with more and more complicated solutions due to all the edge cases. I guess what I want is some kind of FIFO in which i can easily keep appending more data while at the same time poping data out of it that matches certain criteria (ie, newline terminated strings).

A: 

What you're explaining in you're question, reminds me very much of ASCIZ strings. (link text). That may be a helpfull start.

I had to write something similar to this in college for a project I was working on. Unfortunatly, I had control over the sending socket, so I inserted a length of message field as part of the protocol. However, I think that a similar approach may benefit you.

How I approached my solution was I would send something like 5HELLO, so first I'd see 5, and know I had message length 5, and therefor the message I needed was 5 characters. However, if on my async read, i only got 5HE, i would see that I have message length 5, but I was only able to read 3 bytes off the wire (Let's assume ASCII characters). Because of this, I knew I was missing some bytes, and stored what I had in fragment buffer. I had one fragment buffer per socket, therefor avoiding any synchronization problems. The rough process is.

  1. Read from socket into a byte array, record how many bytes was read
  2. Scan through byte by byte, until you find a newline character (this becomes very complex if you're not receiving ascii characters, but characters that could be multiple bytes, you're on you're own for that)
  3. Turn you're frag buffer into a string, and append you're read buffer up until the new line to it. Drop this string as a completed message onto a queue or it's own delegate to be processed. (you can optimize these buffers by actually having you're read socket writing to the same byte array as you're fragment, but that's harder to explain)
  4. Continue looping through, every time we find a new line, create a string from the byte arrange from a recorded start / end position and drop on queue / delegate for processing.
  5. Once we hit the end of our read buffer, copy anything that's left into the frag buffer.
  6. Call the BeginRead on the socket, which will jump to step 1. when data is available in the socket.

Then you use another Thread to read you're queue of incommign messages, or just let the Threadpool handle it using delegates. And do whatever data processing you have to do. Someone will correct me if I'm wrong, but there is very little thread synchronization issues with this, since you can only be reading or waiting to read from the socket at any one time, so no worry about locks (except if you're populating a queue, I used delegates in my implementation). There are a few details you will need to work out on you're own, like how big of a frag buffer to leave, if you receive 0 newlines when you do a read, the entire message must be appended to the fragment buffer without overwriting anything. I think it ran me about 700 - 800 lines of code in the end, but that included the connection setup stuff, negotiation for encryption, and a few other things.

This setup performed very well for me; I was able to perform up to 80Mbps on 100Mbps ethernet lan using this implementation a 1.8Ghz opteron including encryption processing. And since you're tied to the socket, the server will scale since multiple sockets can be worked on at the same time. If you need items processed in order, you'll need to use a queue, but if order doesn't matter, then delegates will give you very scalable performance out of the threadpool.

Hope this helps, not meant to be a complete solution, but a direction in which to start looking.

*Just a note, my implementation was down purely at the byte level and supported encryption, I used characters for my example to make it easier to visualize.

Kevin Nisbet
Yes, I've implemented an approach similar to this already, but I don't like it. It's too messy and complex for my tastes, that's why I'm asking for suggestions here. I like Noldorin's approach, it has the elgance and reuse of existing framework code i desire.
Mystere Man
+3  A: 

That's quite an interesting question. The solution for me in the past has been to use a separate thread with synchronous operations, as you propose. (I managed to get around most of the problems with blocking sockets using locks and lots of exception handlers.) Still, using the in-built asynchronous operations is typically advisable as it allows for true OS-level async I/O, so I understand your point.

Well I've gone and written a class for accomplishing what I believe you need (in a relatively clean manner I would say). Let me know what you think.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

An instance of this class should be created for each NetworkStream and the Process function should be called whenever new data is received (in the callback method for BeginRead, before you call the next BeginRead I would imagine).

Note: I have only verified this code with test data, not actual data transmitted over the network. However, I wouldn't anticipate any differences...

Also, a warning that the class is of course not thread-safe, but as long as BeginRead isn't executed again until after the current data has been processed (as I presume you are doing), there shouldn't be any problems.

Hope this works for you. Let me know if there are remaining issues and I will try to modify the solution to deal with them. (There could well be some subtlety of the question I missed, despite reading it carefully!)

Noldorin
This is an interesting solution. I too have found Iterators to be useful, but this was not a solution my mind would have come up with. I like it.
Mystere Man
Can you explain why you need to implement IDispose? I've been told that calling GC.Collect() is bad practice and can result in poor performance. Are you concerned about rapid allocations within a short time exhausting the heap?
Mystere Man
Yeah, iterators are handy things. In this case you could just as well do it with a generic List, though it may not look so nice of course. If you want to deal with the result as a List/Array, it's trivial to convert to those types anyway, and the implementation is still simpler.
Noldorin
Regarding the use of IDisposable, it is possibly not *necessary*. The null allocation followed by the GC.Collect is used to insure that the memory for its buffer is freed up immediately. Depending on how long lines can get, this may or may not be much of an issue.
Noldorin
(contd) The Dispose method (and thus GC.Collect) should only be called when the associated connection/NetworkStream is closed, which shouldn't be too often, so I wouldn't worry about performance. (There seems to be an alternative however: http://dotnettipoftheday.org/tips/dispose_stringbuilder.aspx)
Noldorin
Also, your comments indicate that the last line is guaranteed to be incomplete. This is not true, since it is possible for it to be complete (including final newline). For instance, after a batch of data is sent, the last line will be complete. It's also possible it just might accidentally be.
Mystere Man
The code *seems* to work even if the last line is newline terminated, but maybe i'm missing something. Is there a potential problem if the last line is newline terminated?
Mystere Man
That's the "trick". Because a new line is appended to all new data before processing, it would treat new data ending in a line break by reading the last complete line, then a zero-length (string.Empty) incomplete line, which is stored into the buffer, all finally followed by the null line.
Noldorin
In fact, to see truly see what's going on, I recommend you create some test data (ending with a line break/an incomplete line), and step through the code of the method in the two cases. Anyway this approach has got me thinking - I'll have to test it against my thread-with-sync-reads at some point.
Noldorin
Thanks for such a great solution. Answer is yours.
Mystere Man
You're welcome. Glad it did the job for you.
Noldorin
I am interested in using this technique, but am not familiar with iterators/IEnumerable. Can you please provide an example usage? Thanks.
strager
I point you to the MSDN docs on C# iterators: http://msdn.microsoft.com/en-us/library/65zzykke(VS.80).aspx. Hopefully that should make some sense.
Noldorin
@Noldorin, I read up on iterators, and understand how they work now. How would I connect my Stream instance to your ASyncStreamProcessor class, though?
strager
You call BeginRead/EndRead in a loop, calling the Process method on each callback. Read up on using asynchronous methods of TcpClient - it's not too complicated.
Noldorin
Ah, thanks Noldorin. Works. =]
strager
Noldorin, nice idea and good work on the class but you should really remove the IDisposable (unnecessary since it only manages a StringBuilder) and otherwise definitely remove the GC.Collect.
Henk Holterman
Yeah, so you may have a point there. I just thought it would be advisable to deallocate such a potentially large chunk of memory. Perhaps it would be better to use the solution in http://dotnettipoftheday.org/tips/dispose_stringbuilder.aspx or simply not bother even. Thanks for the comment.
Noldorin
Yes, instead of _buffer = null; you could set _buffer.Length = 0; this would recycle your (1) StringBuilder and be more efficient in all cases.
Henk Holterman