views:

92

answers:

5

I have a class that I've created to allow asynchronous sequential execution of tasks, using the ThreadPool as the means of execution. The idea is that I'll have multiple instances running serial tasks in the background, but I don't want to have a separate dedicated Thread for each instance. What I'd like to check is whether this class is actually thread safe. It's fairly brief, so I thought I'd run it by the experts here, in case I'm missing something obvious. I've omitted a few of the convenience overloads for different Action types.

/// <summary>
/// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance.
/// Only one task in the queue will execute at a time, with the order of execution matching the order of addition.
/// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks.
/// </summary>
public sealed class SerialAsyncTasker
{
    private readonly Queue<Action> mTasks = new Queue<Action>();
    private bool mTaskExecuting;

    /// <summary>
    /// Queue a new task for asynchronous execution on the thread pool.
    /// </summary>
    /// <param name="task">Task to execute</param>
    public void QueueTask(Action task)
    {
        if (task == null) throw new ArgumentNullException("task");

        lock (mTasks)
        {
            bool isFirstTask = (mTasks.Count == 0);
            mTasks.Enqueue(task);

            //Only start executing the task if this is the first task
            //Additional tasks will be executed normally as part of sequencing
            if (isFirstTask && !mTaskExecuting)
                RunNextTask();
        }
    }

    /// <summary>
    /// Clear all queued tasks.  Any task currently executing will continue to execute.
    /// </summary>
    public void Clear()
    {
        lock (mTasks)
        {
            mTasks.Clear();
        }
    }

    /// <summary>
    /// Wait until all currently queued tasks have completed executing.
    /// If no tasks are queued, this method will return immediately.
    /// This method does not prevent the race condition of a second thread 
    /// queueing a task while one thread is entering the wait;
    /// if this is required, it must be synchronized externally.
    /// </summary>
    public void WaitUntilAllComplete()
    {
        lock (mTasks)
        {
            while (mTasks.Count > 0 || mTaskExecuting)
                Monitor.Wait(mTasks);
        }
    }

    private void RunTask(Object state)
    {
        var task = (Action)state;
        task();
        mTaskExecuting = false;
        RunNextTask();
    }

    private void RunNextTask()
    {
        lock (mTasks)
        {
            if (mTasks.Count > 0)
            {
                mTaskExecuting = true;
                var task = mTasks.Dequeue();
                ThreadPool.QueueUserWorkItem(RunTask, task);
            }
            else
            {
                //If anybody is waiting for tasks to be complete, let them know
                Monitor.PulseAll(mTasks);
            }
        }
    }
}

UPDATE: I've revised the code to fix the main bugs kindly pointed out by Simon. This passes unit tests now, but I still welcome observations.

A: 

"asynchronous sequential"

You are missing something obvious here: asynchronous: multiple things at the same time sequential: only one thing at a time

There's no point in wanting both...

"Only one task in the queue will execute at a time". Then don't use ThreadPool, simply run all your tasks (could be done through a queue).

It seems your code would work, but the only benefit would be that the caller of all these tasks can keep responding to user input while these tasks are being executed in order, while they won't be executed any faster.

MrFox
This is hardly novel, and it can even be useful. Java has a similar "thread pool" built right into its standard library. See: http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor%28%29 You use this when you want to offload some processing onto a different thread, but the tasks you're executing still have an ordering dependency on one another.
Mike Daniels
@MrFox, there will be multiple instances of this class, so you have parallel (asynchronous) operations occurring, each of which is a sequential execution of tasks. This is designed to replace an existing class in a medium-sized codebase that currently uses a dedicated thread and homegrown producer-consumer collection for queuing tasks. There are at least a dozen instances at any given time, which means a lot of Threads sitting idle most of the time.
Dan Bryant
+2  A: 

Don't do it. (Or at least avoid building your own stuff.)

Use the System.Threading.Tasks stuff (new in .NET 4.0). Create your a Task[] (size depends on number of parallel tasks you want) and let them read work items from a BlockingCollection while waiting for a CancellationToken. Your WaitForAll implementation would trigger your token, and call Task.WaitAll(Task[]) which will block until all your tasks are done.

Simon Svensson
If this were a project for .NET 4.0, I would definitely use TPL rather than ThreadPool directly. I'm limited to 3.5, however and I don't have the option of using the back-ported TPL (from Rx) in this project (the client would prefer to stick to the core 3.5 classes as much as possible.)
Dan Bryant
+1  A: 

Here's my second answer assuming that you cant use .NET 4.0 (and want comments on your existing code).

QueueTask enqueues the first task, getting isFirstTask = true, and starts a new thread. However, another thread may enqueue something while the first thread is processing, and Count == 0 => isFirstTask = true, and yet another thread is spawned.

Also, WaitUntilAllComplete will hang indefinitely if the task execution throws an exception (which may not necessarily crash everything, depending on exception handling), causing it to skip the call to RunNextTask().

And your WaitUntilAllComplete just waits until there are no more enqueue tasks, not that those currently executing are actually executing (they could just be enqueued in the ThreadPool) or complete.

Simon Svensson
Thanks for the feedback; definitely a few serious kinks to work out. Should really not be coding just after lunch, as I found out after I posted this question that it was breaking my unit tests, which is a pretty good clue that something is broken (it was working in integration testing only because the timing is much slower.)
Dan Bryant
I don't have a good solution for the exception case. A timeout is probably the best option (I certainly don't want to let it continue executing the following tasks), but I'm trying to maintain interface compatibility with the old class. The prior class was swallowing exceptions; not sure if a possible hang is much better...
Dan Bryant
+1  A: 

I have tried to do make as few changes as possible and still make it work:

/// <summary>
/// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance.
/// Only one task in the queue will execute at a time, with the order of execution matching the order of addition.
/// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks.
/// </summary>
public sealed class SerialAsyncTasker
{
    private readonly Queue<Action> mTasks = new Queue<Action>();
    private bool taskRunning = false;
    private Exception exception = null;
    /// <summary>
    /// Queue a new task for asynchronous execution on the thread pool.
    /// </summary>
    /// <param name="task">Task to execute</param>
    public void QueueTask(Action task)
    {
        if (task == null) throw new ArgumentNullException("task");

        lock (mTasks)
        {
            if (mTasks.Count == 0 && !taskRunning)
            {
                ThreadPool.QueueUserWorkItem(RunTask, task);
            }
            else
            {
                mTasks.Enqueue(task);
            }
            taskRunning = true;
        }

    }

    /// <summary>
    /// Clear all queued tasks.  Any task currently executing will continue to execute.
    /// </summary>
    public void Clear()
    {
        lock (mTasks)
        {
            mTasks.Clear();
        }
    }

    /// <summary>
    /// Wait until all currently queued tasks have completed executing.
    /// If no tasks are queued, this method will return immediately.
    /// This method does not prevent the race condition of a second thread 
    /// queueing a task while one thread is entering the wait;
    /// if this is required, it must be synchronized externally.
    /// </summary>
    public void WaitUntilAllComplete()
    {
        lock (mTasks)
        {
            while (exception == null && (mTasks.Count > 0 || taskRunning))
                Monitor.Wait(mTasks);
            if (exception != null)
            {
                Console.WriteLine(exception);
            }
        }
    }

    private void RunTask(Object state)
    {
        var task = (Action)state;
        try
        {
            task();
            RunNextTask();
        }
        catch (Exception e)
        {
            lock (mTasks)
            {
                exception = e;
                Monitor.PulseAll(mTasks);
            }
        }
    }

    private void RunNextTask()
    {
        lock (mTasks)
        {
            if (mTasks.Count > 0)
            {
                var task = mTasks.Dequeue();
                ThreadPool.QueueUserWorkItem(RunTask, task);
                taskRunning = true;
            }
            else
            {
                taskRunning = false;
                Monitor.PulseAll(mTasks);
            }
        }
    }
}

Minimal test:

public static void Main(string[] args)
{
    var tq = new SerialAsyncTasker();
    var a = 0;
    for (int i = 0; i < 10; i++)
    {
        tq.QueueTask(delegate
        {
            var myId = Interlocked.Increment(ref a);
            Console.WriteLine("{0} started", myId);
            Thread.Sleep(5000);
            if (myId == 3) throw new Exception("blah");
            Console.WriteLine("{0} finished", myId);
        });
    }
    tq.WaitUntilAllComplete();
}
andras
Thanks for the help; I can see we came up with similar solutions (using a taskRunning flag) for the bugs that Simon identified.
Dan Bryant
@Dan Bryant: you're welcome. Now seeing Simon's post (and your comments as well, I've added some minimal exception handling.).
andras
+1  A: 

I see a few issues your with your SerialAsyncTasker class, but it sounds like you might have a good grasp of those so I will not go into any details on that topic (I may edit my answer with more details later). You indicated in the comments that you cannot use .NET 4.0 features nor can you use the Reactive Extensions backport. I propose that you use the producer-consumer pattern with a single consumer on a dedicated thread. This would perfectly fit your requirement of asynchronously executing tasks sequentially.

Note: You will have to harden the code to support gracefully shutting down, handling exceptions, etc.

public class SerialAsyncTasker
{
  private BlockingCollection<Action> m_Queue = new BlockingCollection<Action>();

  public SerialAsyncTasker()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          Action task = m_Queue.Take();
          task();
        }
      });
    thread.IsBackground = true;
    thread.Start();
  }

  public void QueueTask(Action task)
  {
    m_Queue.Add(task);
  }
}

Too bad you cannot use the BlockingCollection from the .NET 4.0 BCL or Reactive Extension download, but no worries. It is actually not too hard to implement one yourself. You can use Stephen Toub's blocking queue as a starting point and just rename a few things.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take()
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0) Monitor.Wait(m_Queue);
            return m_Queue.Dequeue();
        }
    }

    public void Add(T value)
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(value);
            Monitor.Pulse(m_Queue);
        }
    }
}
Brian Gideon
A producer-consumer task thread is actually what this implementation is designed to replace. The problem we've had is that we have a large number of these sequential task threads and the number of dedicated Threads is increasing, raising concern over the application's scalability. The intention of this class is to allow a large number of (mostly idle) asynchronous sequential task processors, which would share threading resources via the thread pool only as needed when they're active.
Dan Bryant
@Dan: I totally see where you are coming from. What about creating your own thread pool in which queued items really are run in FIFO order. I am visualizing a `Dictionary<string, BlockingCollection<Action>>` being the fundamental data structure in which each task is associated with a specific "fiber" by name. All items in a specific fiber are run in order, but all of the fibers share the same fixed number of threads. An example call would be `SerialThreadPool.QueueUserWorkItem("fiber1", action)`.
Brian Gideon
@Brian, that's an interesting idea and I'll have to think more about how that would work.
Dan Bryant