views:

58

answers:

3

Hi,

Is my use of ConcurrentQueue here between 2 threads ok? I wanted to check I don't need to "lock" anywhere explicitly. In particular look at the lines where I have in COMMENTS would I drop a packet here...

public class PacketCapturer
{

private static ConcurrentQueue<Packet> _packetQueue = new ConcurrentQueue<Packet>();

public PacketCapturer(IPHostEntry proxyDns, ref BackgroundWorker bw)
{
    // start the background thread
    var backgroundThread = new System.Threading.Thread(BackgroundThread);
    backgroundThread.Start();

    // Start Packet Capture with callback via PacketCapturerCallback
}

private void PacketCapturerCallback(Packet packet)
{
    _packetQueue.Enqueue(packet);
}

private static void BackgroundThread()
{
    while (!BackgroundThreadStop)
    {
        if (_packetQueue.Count == 0) Thread.Sleep(250);
        else 
        {
            ConcurrentQueue<Packet> ourQueue;  
            ourQueue = _packetQueue;   // COULD I DROP A PACKET BETWEEN HERE
            _packetQueue = new ConcurrentQueue<Packet>();      // AND HERE???

            Console.WriteLine("BackgroundThread: ourQueue.Count is {0}", ourQueue.Count);
        }
    }
}
+1  A: 

Edit: I think Mark/Gabe are right that you won't lose any Packets. I'll leave the rest in just for reference in case anyone else can weigh in on this.


Simply, yes. You could lose one or more Packets there. You might want to look into what methods ConcurrentQueue offers to get/remove a portion of it as that looks like what you want to do.

Why not TryDequeue until it returns false:

    else 
    {
        Queue<Packet> ourQueue = new Queue<Packet>();  //this doesn't need to be concurrent unless you want it to be for some other reason.
        Packet p;
        while(_packetQueue.TryDequeue(out p))
        {
            ourQueue.Enqueue(p);
        }

        Console.WriteLine("BackgroundThread: ourQueue.Count is {0}", ourQueue.Count);
    }
Chad
@Chad - care to explain *how* "yes"?
Marc Gravell
Marc... I mean yes, he can lose one or more Packets...here's how:Thread 1: ourQueue = _packetQueue;Thread 2: _packetQueue.Enqueue(packet); //this packet gets lost...more Enqueues could happen hereThread 1: _packetQueue = new ConcurrentQueue<Packet>();
Chad
The concept of a queue that I can add to, and remove from, from two (2) threads concurrently seems good to me - So does Queue support this concurrent usage here? Should it be ConcurrentQueue?
Greg
Greg, Queue does not support this out of the box. I used a Queue in my example because the Queue I add to is being added to by just 1 thread. _packetQueue should still be ConcurrentQueue<Packet> but ourQueue can be Queue<Packet>. That is, unless, you need ourQueue to also work across multiple threads. I don't know what the rest of your code does with ourQueue, so I cannot say either way.
Chad
I'm with Marc -- I don't see where a packet would be dropped. In your example, Chad, the packets wouldn't get lost; they would just end up in `ourQueue`.
Gabe
Marc's explanation seems accurate. However, I guess I still feel uneasy about saying the code is correct.
Chad
+1  A: 

No, is not OK. First of all, if you change the reference like this in a concurrent thread, the _packetQueue must be marked volatile to prevent compiler and codegen optimizations from never seeing the change. The cahnging of the _packetQueue shoudl normally occur as an Interlocked.CompareExchange, but this is less critical for your usage.

But for more worying is the pattern of changing the packetQueue instance like this in the background thread. What is the purpose of this? It has a horible code smell...

updated

What I ususaly do is this:

Producer thread(s):

Producer () {
...
lock(_sharedQueue) {
  _sharedQueue.Enqueue(something);
}
...
}

consumer thread:

consumer (...) {
...
var Something[] toProcess = null;
lock(_sharedQueue)
{
  toProcess = _sharedQueue.Toarray();
   _sharedQueue.Clear();
}
// Process here the toProcess array
...
}

This was good enough for every single use I ever had. Processing does not occur under the lock so locking is minimal. There is no need for the fancy ConcurrentQueue, a plain old .Net 2.0 collection is enough. Usually I use a dedicated locking object, for good practice, instead of locking the actual queue instance.

Remus Rusanu
thanks - I like Chad's suggestion if that works ok
Greg
What is your goal? Are you aiming for a lock free queue? Copying out the elements will not be lock free, the ConcurrentQueue *will* internally use locking. Achieving a lock free queue is *not* trivial by any means, and I highly doubt you really need it. Lock free structure are needed in systems with +64 cores and doing extreamly intensive and time critical work, like an Os scheduler.
Remus Rusanu
I was just after a way to decouple the packet capture callback really, so to keep the callback itself very short, and have the thing that uses the data on another thread
Greg
PS. I've just tried using ConcurrentQueue and then using the Enqueue method on one thread, and the TryDequeue method on the other. It seems to work ok at first glance, but I guess I'd have to do some sort of load testing to prove it's ok.
Greg
The why not simply TryDequeue and process, one element at a time? Nothing else is needed, ConcurrentQueue has internal locking protection.
Remus Rusanu
+1  A: 

I don't think you could drop a packet, as obviously any thread is going to dereference to either the old or new. The edge-case is that you get more data arrive in ourQueue after you think you've swapped them, or you could also end up not noticing the reference change in the Sleep loop (but: with 2 threads, the only thread that changes the reference is this thread, so not necessarily a problem).

TBH, if you have 2 threads here, why not just lock or use ReaderWriterLock? It will be a lot simpler to figure out what happens when...

Marc Gravell
thanks - I like Chad's suggestion below if that works ok
Greg
Marc - what's you comment re using lock or ReadWriterLock, or going with the example Chad gave?
Greg
@Greg - if you don't *have* to swap the queue (for some reason) then that is ideal. With Chad's update, that looks sound.
Marc Gravell