Hi,
I'm experiencing an issue in a multi threaded application and have been debugging it for the last 3 days but for the life of it can not figure it out. I'm writing this, hoping that I either have a DUH moment when typing this or somebody sees something obvious in the code snippets I provide. Here's what's going on:
I've been working on a new UDP networking library and have a data producer that multicasts UDP datagrams to several receiver applications. The sender sends on two different sockets that are bound to separate UDP multicast addresses and separate ports. The receiver application also creates two sockets and binds each one to one of the sender's multicast address/port.
When the receiver receives the datagram, it copies it from the the buffer in a MemoryStream which is then put onto a thread safe queue, where another thread reads from it and decodes the data out of the MemoryStream.
Both sockets have their own queues.
What happens now is really weird, it happens randomly, non-reproducible and when I run multiple receiver applications, it only happens randomly on one of them every now and then.
Basically, the thread that reads the MemoryStream out of the queue, reads it via a BinaryReader like ReadInt32(), etc. and thereby decodes the data. Every now and then however when it reads the data, the data it reads from it is incorrect, e.g. a negative integer number which the sender never would encode.
However, as mentioned before, the decoding only fails in one of the receiver applications, in the other ones the datagram decodes fine. Now you might be saying, well, probably the UDP datagram has a byte corruption or something but I've logged every single datagram that's coming in and compared them on all receivers and the datagrams every application receives are absolutely identical. Now it gets even weirder, when I dump the datagram that failed to decode to disk and write a unit test that reads it and runs it through the decoder, it decodes just fine. Also when I wrap a try/catch around the decoder, reset the MemoryStream position in the catch and run it through the decoder again, it decodes just fine. To make it even weirder, this also only happens when I bind both sockets to read data from the sender, if I only bind one, it doesn't happen or at least I wasn't able to reproduce it.
Here are is some corresponding code to what's going on:
This is the receive callback for the socket:
private void ReceiveCompleted(object sender, SocketAsyncEventArgs args)
{
if (args.SocketError != SocketError.Success)
{
InternalShutdown(args.SocketError);
return;
}
if (args.BytesTransferred > SequencedUnitHeader.UNIT_HEADER_SIZE)
{
DataChunk chunk = new DataChunk(args.BytesTransferred);
Buffer.BlockCopy(args.Buffer, 0, chunk.Buffer, 0, args.BytesTransferred);
chunk.MemoryStream = new MemoryStream(chunk.Buffer);
chunk.BinaryReader = new BinaryReader(chunk.MemoryStream);
chunk.SequencedUnitHeader.SequenceID = chunk.BinaryReader.ReadUInt32();
chunk.SequencedUnitHeader.Count = chunk.BinaryReader.ReadByte();
if (prevSequenceID + 1 != chunk.SequencedUnitHeader.SequenceID)
{
log.Error("UdpDatagramGap\tName:{0}\tExpected:{1}\tReceived:{2}", unitName, prevSequenceID + 1, chunk.SequencedUnitHeader.SequenceID);
}
else if (chunk.SequencedUnitHeader.SequenceID < prevSequenceID)
{
log.Error("UdpOutOfSequence\tName:{0}\tExpected:{1}\tReceived:{2}", unitName, prevSequenceID + 1, chunk.SequencedUnitHeader.SequenceID);
}
prevSequenceID = chunk.SequencedUnitHeader.SequenceID;
messagePump.Produce(chunk);
}
else
UdpStatistics.FramesRxDiscarded++;
Socket.InvokeAsyncMethod(Socket.ReceiveAsync, ReceiveCompleted, asyncReceiveArgs);
}
Here's some stub code that decodes the data:
public static void OnDataChunk(DataChunk dataChunk)
{
try
{
for (int i = 0; i < dataChunk.SequencedUnitHeader.Count; i++)
{
int val = dataChunk.BinaryReader.ReadInt32();
if(val < 0)
throw new Exception("EncodingException");
// do something with that value
}
}
catch (Exception ex)
{
writer.WriteLine("ID:" + dataChunk.SequencedUnitHeader.SequenceID + " Count:" + dataChunk.SequencedUnitHeader.Count + " " + BitConverter.ToString(dataChunk.Buffer, 0, dataChunk.Size));
writer.Flush();
log.ErrorException("OnDataChunk", ex);
log.Info("RETRY FRAME:{0} Data:{1}", dataChunk.SequencedUnitHeader.SequenceID, BitConverter.ToString(dataChunk.Buffer, 0, dataChunk.Size));
dataChunk.MemoryStream.Position = 0;
dataChunk.SequencedUnitHeader.SequenceID = dataChunk.BinaryReader.ReadUInt32();
dataChunk.SequencedUnitHeader.Count = dataChunk.BinaryReader.ReadByte();
OnDataChunk(dataChunk);
}
}
You see in the catch{} part I simply reset the MemoryStream.Position to 0 and call the same method again and it works just fine that next time? I'm really out of ideas at this point and unfortunately had no DUH moment writing this. Anybody have any kind of idea what might be going on or what else I could do to troubleshoot this?
Thanks,
Tom