views:

64

answers:

6

Hi,

I need to design perfect worker thread method. The method must do the following:

  • 1) extract something from queue (let's say a queue of string) and do something
  • 2) stop and return when class is disposed
  • 3) wait for some event (that queue is not empty) and do not consume cpu
  • 4) run in separate thread

Main thread will add string to queue and signal thread method to continue and do the job.

I would like you to provide me the the template with required syncronization objects.

class MyClass, IDisposable
{
  // Thread safe queue from third party
  private ThreadSafeQueue<string> _workerQueue;
  private Thread _workerThread;

 public bool Initialize()
{
 _workerThread = new Thread(WorkerThread).Start();
}

 public AddTask(string object)
{
 _workerQueue.Enqueue(object);     
 // now we must signal worker thread
}

// this is worker thread
private void WorkerThread()
{        
  // This is what worker thread must do
  List<string> objectList = _workerQueue.EnqueAll 
  // Do something      
}

 // Yeap, this is Dispose
 public bool Dispose()
 {
 }
}
+1  A: 

I think you should consider using BackgroundWorker class, which may fit well to your needs.

A.
Warning: **Only** use `BackgroundWorker` to prevent long-running processes from blocking a UI. If no UI is involved, `BackgroundWorker` is not the correct solution. You can make it work, but it's not the intended use.
Neil Barnwell
+1  A: 

Sounds like BlockingQueue is what you need.

Neil Barnwell
+1  A: 

You should take a look at the new .Net 4 System.Collections.Concurrent Namespace. Also this little example should help you to get a better understanding on how to use it.

Oliver
+2  A: 

Try something like this. instantiate with type string and give it a delegate to process your string:

    public class SuperQueue<T> : IDisposable where T : class
{
    readonly object _locker = new object();
    readonly List<Thread> _workers;
    readonly Queue<T> _taskQueue = new Queue<T>();
    readonly Action<T> _dequeueAction;

    /// <summary>
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
    /// </summary>
    /// <param name="workerCount">The worker count.</param>
    /// <param name="dequeueAction">The dequeue action.</param>
    public SuperQueue(int workerCount, Action<T> dequeueAction)
    {
        _dequeueAction = dequeueAction;
        _workers = new List<Thread>(workerCount);

        // Create and start a separate thread for each worker
        for (int i = 0; i < workerCount; i++)
        {
            Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i )};
            _workers.Add(t);
            t.Start();

        }

    }


    /// <summary>
    /// Enqueues the task.
    /// </summary>
    /// <param name="task">The task.</param>
    public void EnqueueTask(T task)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(task);
            Monitor.PulseAll(_locker);
        }
    }

    /// <summary>
    /// Consumes this instance.
    /// </summary>
    void Consume()
    {
        while (true)
        {
            T item;
            lock (_locker)
            {
                while (_taskQueue.Count == 0) Monitor.Wait(_locker);
                item = _taskQueue.Dequeue();
            }
            if (item == null) return;

            // run actual method
            _dequeueAction(item);
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        // Enqueue one null task per worker to make each exit.
        _workers.ForEach(thread => EnqueueTask(null));

        _workers.ForEach(thread => thread.Join());

    }
}
Arthur Raffles
+1 for a correct implementation.
Brian Gideon
+1  A: 

What you are describing is best accomplished with the producer-consumer pattern. This pattern is most easily implemented with a blocking queue. If you are using .NET 4.0 then you can take advantage of the BlockingCollection class. Here is how I am seeing your code working. In the following example I am using a null value as sentinel for gracefully ending the consumer, but you could also take advantage of the CancellationToken parameter on the Take method.

public class MyClass : IDisposable
{
  private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

  public class MyClass()
  {
    var thread = new Thread(Process);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Dispose()
  {
    m_Queue.Add(null);
  }

  public void AddTask(string item)
  {
    if (item == null)
    {
      throw new ArgumentNullException();
    }
    m_Queue.Add(item);
  }

  private void Process()
  {
    while (true)
    {
      string item = m_Queue.Take();
      if (item == null)
      {
        break; // Gracefully end the consumer thread.
      }
      else
      {
        // Process the item here.
      }
    }
  }
}
Brian Gideon