views:

76

answers:

3

Sorry this question is not very clear, if I know the correct words to describe the problem, Google would be likely to come up with the answer.

I am looking for a queue class that:

  • Lets any number of threads put an item on the queue
  • Items are processed in the order they are added to the queue
  • I don’t mind what thread process an item
  • Only one item is processed at a time.
  • I rather not have a thread blocked waiting for an item to be added to the queue if there are no items on the queue.
  • The normal case is for the queue to be empty most of the time.

e.g Just like what happens with a BeginInvoke on a WinForms windows... (Or PostMessage if you have every done raw win32 programming) We are using .net 3.5

I am looking for something ready-made in the .net framework, or a open source project that has good unit tests, as I don't wish to have to write all the unit tests for a home-make solution.


For background, see Why are my message be processed out of order over a single WCF TCP channel (with ConcurrencyMode.Reentrant)? by using this depatcher I was able to change to using ConcurrencyMode.Single and still advoid deadlocks.

+3  A: 

Here's a sketch of a class that can do that:

public class WorkerQueue<T> {
    private Queue<T> workerQueue   = new Queue<T>();
    private object   padlock       = new object();
    private bool     isProcessing  = false;
    private Thread   workerThread;

    public void QueueWorkItem(T item) {
        lock(padlock) {
            workerQueue.Enqueue(item);
            if (!isProcessing) {
                isProcessing = true;
                workerThread = new Thread(() => { this.ProcessWork });
                workerThread.Start();

            }
        }
    }

    private void ProcessWork() {
        // 1) Thread-safe dequeue operation
        // 2) Keep processing while work is on the queue. External callers can
        //    add items to the queue while this is ongoing.
        // 3) When the queue is empty, set isProcessing to false (thread-safely)
    }

}

Applications would use it like this:

public class Application {
    private WorkerQueue<object> workerQueue = new WorkerQueue<object>();

    // This can run on multiple threads if need be
    public void SomeMethodThatCreatesWork() {
        object workItem = ExternalCall();
        this.workerQueue.QueueWorkItem(workItem);
    }
}

It would probably be useful to let applications stop the processing as well, probably by adding a flag that ProcessWork could check after each item is dequeued, but it's not clear what to do with unprocessed items (perhaps it would be enough to allow access to them and let the caller decide).

Jeff Sternal
thanks, I was hoping for something ready-made in the framework, or a open source project
Ian Ringrose
This seems about right, except that QueueWorkItem should start a new thread after joining on the existing one (if it exists). And ProcessWork should not sleep if there's nothing on the queue, but rather return.
Eric Mickelsen
tehMick - agreed: I'm editing my answer to reflect that.
Jeff Sternal
@Ian - by all means yes. If anyone can find one that meets your requirements and has been tested, I'd use that instead too!
Jeff Sternal
A: 

Use a ThreadPool, but use SetMaxThreads to constrain the number of running threads to one. Use QueueUserWorkItem to add more tasks to the queue. http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

Eric Mickelsen
but calling SetMaxThreads will effect everything else that uses the thread pool, so this is not a valid soloution.
Ian Ringrose
Actually, this may present issues on multi-processor machines or adversely affect the performance of some libraries. On second thought, this may not necessarily be the way to go.
Eric Mickelsen
+1  A: 

Use the CCR it provides Port primitives which are efficient FIFO queues and Dispatcher, DispatcherQueues and Arbiter primitives that allow you to control task scheduling.

SpaceghostAli
+1 - mighty, I would never have thought to look inside the Microsoft Robotics toolkits.
Jeff Sternal