views:

196

answers:

8

I'm really not sure how to approach this, but I am subscribing to events fired within a custom class and ideally I wish to queue them and handle them first in first out as they come in. I am aware of Queue<T> and I think I should use this? but my question is in the event handler when my message is received, would I simply Enqueue() to the queue there, and if so, then how can the queue be crunched through as new items are added?

I was considering calling a method in the constructor which performs something like (hold on to your hats):

while (true)
{
  foreach (var item in myQueue)
  {
    // process
    myQueue.Dequeue();
  }
}

Surely there must be a more elegant way to do this? This should effectively hit myQueue and iterate as it contains elements and do what I want though. What would performance be like? I can spawn this on a separate thread to avoid any thread blocking, I just really have a had time accepting while (true)!

+4  A: 

This is a classic producer/consumer problem. A quick web search reveals http://msdn.microsoft.com/en-us/library/yy12yx1f(VS.80,loband).aspx, which covers this exactly.

You don't want to do a while(true) loop since your thread will be burning 100% of the CPU, even when there is no work for it do, potentially starving other threads in the system.

Michael
A: 

What you are doing there doesn't look right. If you really are using a queue, then you should really be pulling items out of the queue, not iterating over it:

while (!queue.empty()) // or whatever
{
  process the first item in the queue
}
1800 INFORMATION
@1800, I have comments for you in Reed's post.
GONeale
@1800, there is no queue.Empty() and queue.Count() not matching would terminate the while.
GONeale
+1  A: 

If you're running this from multiple threads, you'll need to introduce some form of locking to prevent problems with synchronization of your queue. You probably should also make sure that your queue is locked when enqueuing elements, as well as dequeuing elements.

If the thread processing your queue is going to do nothing but this, you're basic code is probably fine, with the exception of handling the case where the Queue is currently empty. You probably should add something like:

while (myQueue.Count == 0)
    Thread.Sleep(sleepTime);

This would give you a way to "wait" until your events fill your queue.

Also, when you dequeue from your queue, you won't be able to use a foreach. You'll want to do something like:

while (myQueue.Count == 0)
    Thread.Sleep(sleepTime);
while (myQueue.Count > 0)
{
    lock(myLock)
        myObject = myQueue.Dequeue();
    // do your work...
}

This will prevent collection modification problems if somebody adds to your queue, and keep you from needing to lock the entire time your processing your elements.


Edit: I do agree with some comments, that this is not always the most effective strategy.

If the queue is going to be largely empty, and only have elements on occasion, I'd make a custom class that wraps it.

You could have an event triggered when the queue's made empty/non-empty. As soon as the queue receives items, the event could trigger a thread to process it and empty it out. As soon as it reached 0 items, it could have an event to stop the processing.

This would be much more effective than waiting constantly if the queue's state is empty most of the time. If, on the other hand, the queue is full nearly constantly, and the processing thread will rarely keep up, I'd use the approach above for simplicity.

Reed Copsey
Nice suggestions. Thanks Reed. I will give it some thought, yes my thread could do nothing but this, I'm happy with that.
GONeale
Please do not do this. Busy waiting is bad for performance and battery life. If you need to wait for the queue to have some items in it, you should use an event of some kind
1800 INFORMATION
That's what I would like to do 1800, and it's the first thing I checked on the Queue class, an event handler but there is none. I guess I could make my own by inheriting from Queue and overloading the Enqueue command? What do you think?
GONeale
@1800 INFORMATION: Depending on how frequently things are added to the queue, I sometimes agree. If the queue will typically have items, this will rarely wait (and handle the wait case) effectively. If the queue is mostly empty, and rarely has items, I'd try to implement this differently.
Reed Copsey
Edited post to include suggestion of wrapping into a custom queue class with an event system in place, though - it's a good idea if the queue isn't normally full.
Reed Copsey
Grr hold on, if I was to throw an event when Enqueue() occured I'd be no better off as we would never iterate a 'list' of queue items, as only one would exist in the list everytime I processed it.
GONeale
Read what I wrote: Have 2 events triggered when the "QueueEmpty" and when "QueueHasElements". These would start your loop, and stop it (so instead of while(true), it'd be while(notempty), and the event would clean that.
Reed Copsey
You'd process as soon as an element was added, but the event wouldn't keep firing if 5 elements were added, in that case... maybe even just a single event trigger when the queue goes from empty->non-empty would be enough, since that would let you start your loop until it's empty.
Reed Copsey
Yes I understand what you're saying, I might have a shot at the custom class with events.
GONeale
you'll need to synchronize whenever you add to the queue as well so that it's internal state isn't corrupted in case the processing thread and the event mutate it at the same time
Arnshea
OK have my solution up based on Reed's responses.
GONeale
@GoNeale: Glad we could help.
Reed Copsey
+4  A: 

You can't do this. The enumerator returned for the foreach will throw an exception if the underlying collection is changed while the enumerator is still being used.

Essentially what you need to do is set up another thread to handle the events. Ideally, you'd signal this other thread (via an event or other synchronization mechanism) that there is work available. It would dequeue the work item, process it, then sleep until the next signal comes in. Alternatively you could use polling (wake periodically and check for another item), but that would be less efficient. In any case you'll need to use locking to make sure that you don't attempt to modify the queue in both threads at the same time.

tvanfosson
+1  A: 

I usually use a ManualResetEvent to signal that an element has been added to the collection.

The event handler does something like this:

lock (myLock)
{
   myq.Enqueue(...);
   myqAddedSignal.Set();
}

The processing thread waits on the signal - once signaled it empties the queue, resets the signal then processes the items:

while (true)
{
   myqAddedSignal.WaitOne();
   lock (myLock)
   {
      // pull everything off myQ into a list
      // clear myQ
      myqAddedSignal.Reset();
   }

   foreach (obj in copyOfMyQ)
   {
      ...
   }
}

This will process the items in the queue in a thread-safe manner. The only shared state is myqAddedSignal - access to it is synchronized on myLock (I usually just make this an object).

Arnshea
A: 

You could see this existing answer, which has such a queue.

Marc Gravell
A: 

Following advice given by Reed, I have created a custom class and throw events when the queue has been made empty and has been filled.

Custom EventQueue<T> Class:

public class EventQueue<T> : Queue<T>
{
    public delegate void OnQueueMadeEmptyDelegate();
    public event OnQueueMadeEmptyDelegate OnQueueMadeEmpty;
    public delegate void OnQueueMadeNonEmptyDelegate();
    public event OnQueueMadeNonEmptyDelegate OnQueueMadeNonEmpty;

    public new void Enqueue(T item)
    {
        var oldCount = Count;
        base.Enqueue(item);
        if (OnQueueMadeNonEmpty != null &&
            oldCount == 0 && Count > 0)
            // FIRE EVENT
            OnQueueMadeNonEmpty();
    }
    public new T Dequeue()
    {
        var oldCount = Count;
        var item = base.Dequeue();
        if (OnQueueMadeEmpty != null &&
            oldCount > 0 && Count == 0)
        {
            // FIRE EVENT
            OnQueueMadeEmpty();
        }
        return item;
    }
    public new void Clear()
    {
        base.Clear();
        if (OnQueueMadeEmpty != null)
        {
            // FIRE EVENT
            OnQueueMadeEmpty();
        }
    }
}

(I have removed <summary>'s for smaller code sample. I am using the "new" modifier as a way to append additional logic to the base logic).

Privates in main class:

public delegate void InitQueueDelegate();
private InitQueueDelegate initQueueDelegate;

private EventQueue<QueueRequest> translationQueue;
private Object queueLock = new Object();

In main class constructor:

initQueueDelegate = this.InitQueue;
initQueueDelegate.BeginInvoke(null, null);

In main class body:

private void InitQueue()
{
    this.translationQueue = new EventQueue<QueueRequest>();
    this.translationQueue.OnQueueMadeEmpty += new EventQueue<QueueRequest>.OnQueueMadeEmptyDelegate(translationQueue_OnQueueMadeEmpty);
    this.translationQueue.OnQueueMadeNonEmpty += new EventQueue<QueueRequest>.OnQueueMadeNonEmptyDelegate(translationQueue_OnQueueMadeNonEmpty);
}

void translationQueue_OnQueueMadeNonEmpty()
{
    while (translationQueue.Count() > 0)
    {
        lock (queueLock)
        {
            QueueRequest request = translationQueue.Dequeue();
#if DEBUG
            System.Diagnostics.Debug.WriteLine("Item taken from queue...");
#endif
            // hard work
            ....
            ....
            ....
        }
    }
}

void translationQueue_OnQueueMadeEmpty()
{
    // empty queue
    // don't actually need to do anything here?
}

private void onMessageReceived(....)
{
  ....
  ....
  ....
  // QUEUE REQUEST
  lock (queueLock)
  {
    QueueRequest queueRequest = new QueueRequest
                                    {
                                        Request = request,
                                        Sender = sender,
                                        Recipient = tcpClientService
                                    };
    translationQueue.Enqueue(queueRequest);
#if DEBUG
    System.Diagnostics.Debug.WriteLine("Item added to queue...");
#endif
  }
}

And finally the QueueRequest struct:

public struct QueueRequest
{
    public MessageTranslateRequest Request { get; set; }
    public TCPClientService Sender { get; set; }
    public TCPClientService Recipient { get; set; }
}

I know there's a lot there, but wanted you guys to check off the complete implementation. What do you think? Is how I have performed the locking correct?

I will award credit to Reed if this is ok as my solution was created from his ideas.

GONeale
A: 

The structure you're looking for is a semaphore.

You add an item to the queue, then add a count to the semaphore.

You wait on the semaphore in the other thread, and process an item from the queue.

Depending on the queue implementation (like the BCL one) you'll have to lock/unlock while retrieving.

Darren Clark