views:

2417

answers:

10

I'm sorry for a redundant question. However, I've found many solutions to my problem but none of them are very well explained. I'm hoping that it will be made clear, here.

My C# application's main thread spawns 1..n background workers using the ThreadPool. I wish for the original thread to lock until all of the workers have completed. I have researched the ManualResetEvent in particular but I'm not clear on it's use.

In pseudo:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

If necessary, I will know the number of workers that are about to be queued before hand.

A: 

Why don't you create a static/shared integer that all the threads can see... i.e.

static int numThreads = 0;

When you spawn your threads, the first thing they should do is:

numThreads += 1;

The last thing they should do is:

numThreads -= 1;

And while this is happening, your main thread can simply wait for numThreads to go back to 0. This will not only give you your logic but will provide you an easy number of currently active threads.

routeNpingme
First, that is completely *non* thread-safe (+=/-= are not atomic). Static is a bad idea as it limits you to one copy. How will you "wait for numThreads to go back to 0"? in a loop? Massive CPU cost, and may never even be seen unless the field is volatile...
Marc Gravell
See also: http://stackoverflow.com/questions/458173/can-a-c-thread-really-cache-a-value-and-ignore-changes-to-that-value-on-other-th/458193#458193
Marc Gravell
Final point - you would need to increment in the calling code (not inside the thread) - otherwise a valid exit condition is "none of the threads have started yet" (possible due to thread-pool saturation).
Marc Gravell
Didn't realize this question was getting that deep into implementation. Good points.
routeNpingme
+9  A: 

First, how long do the workers execute? pool threads should generally be used for short-lived tasks - if they are going to run for a while, consider manual threads.

Re the problem; do you actually need to block the main thread? Can you use a callback instead? If so, something like:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

This is a fairly lightweight (no OS primitives) way of tracking the workers.

If you need to block, you can do the same using a Monitor (again, avoiding an OS object):

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");
Marc Gravell
Your code will wait infinitely if one of the delegates throws an exception.
JaredPar
If one of those delegates throws an exception, I'm going to lose the whole process, so that is fairly arbitrary... I'm assuming it won't throw, but I'll make it explicit ;-p
Marc Gravell
The workers will be processing expensive operations including reading and writing files and performing SQL selects and inserts involving Binary/Image columns. It's unlikely they'll live long enough to require explicit threads, but performance could be gained by letting them execute in parallel.
Kivin
+1, To handle exception in worker process, you could do try { DoSomeWork(tmp); } finally { endOfThread(); }
chaowman
@Marc, whether or not a ThreadPool exception kills the process is not a certainty. It changed between version 1.0 and 2.0 of the CLR ( I believe it's also configurable) Truthfully I can't remember which version does which anymore. I just assume the worst with threads :)
JaredPar
Found the documentation for the exception change: http://msdn.microsoft.com/en-us/library/ms228965.aspx
JaredPar
+8  A: 

Try this. The function takes in a list of Action delegates. It will add a ThreadPool worker entry for each item in the list. It will wait for every action to complete before returning.

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}
JaredPar
That's pretty darned close to what I came up with on my own after re-reading some material. Only difference, really, is the lack of lambda since my web server doesn't like .NET 3.0. Cheers.
Kivin
WaitHandle.WaitAll fails if the number of handles is larger than the system permits. On my Win2k3 server that number is 64 so I get an exception when I try to spawn more than 64 items...
Eran Kampf
@Eran, try writing a SpawAndWaitHelper which essentially has the code above. Use SpawAndWait to divide up the enumerable into 64 size chunks and call the helper for each chunk.
JaredPar
ah... http://stackoverflow.com/questions/1045980/is-there-a-better-way-to-wait-for-queued-threads/1074770#1074770
modosansreves
+1  A: 

I think you were on the right track with the ManualResetEvent. This link has a code sample that closely matches what your trying to do. The key is to use the WaitHandle.WaitAll and pass an array of wait events. Each thread needs to set one of these wait events.

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

Edit:

Make sure that in your worker thread code path you set the event (i.e. autoEvents[1].Set();). Once they are all signaled the waitAll will return.

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}
James
A: 

Based on material available in several of the above posts, I'm toying with the following solution. I'm still not sure I've got the synchronization object used properly.

ManualResetEvent[] locks = new ManualResetEvent[Tasks.Count];
for(int i = 0; i < locks.length; i++)
  locks[i] = new ManualResetEvent(false);

int lockIdx = 0;
foreach( var task in Tasks )
{
  ThreadPool.QueueUserWorkItem(delegate(object state) { 
    // do stuff
    (state as ManualResetEvent).Set();
  }, locks[lockIdx++]);
}

WaitHandle.WaitAll(locks);

I'd be interested to know if I've got the reset events configured properly and if there's any serious flaws in this solution.

Kivin
It isn't entirely clear how this works (since it is incomplete) - the biggest thing to watch might be if you use "task" in the "do stuff" (that would be incorrect - you need a second variable inside the foreach - see the "tmp" in my first reply for an example).
Marc Gravell
Good point. In the actual implementation I chose to use a for instead of foreach, so I never actually ran into that problem.
Kivin
+7  A: 

Here's a different approach - encapsulation; so your code could be as simple as:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Where the Forker class is given below (I got bored on the train ;-p)... again, this avoids OS objects, but wraps things up quite neatly (IMO):

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}
Marc Gravell
+1  A: 

I've found a good solution here :

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

May come in handy for others with the same issue

Gordon Carpenter-Thompson
A: 

Using .NET 4.0 Barrier class:

        Barrier sync = new Barrier(1);

        foreach(var o in collection)
        {
            WaitCallback worker = (state) => 
            {
                // do work
                sync.SignalAndWait();
            };

            sync.AddParticipant();
            ThreadPool.QueueUserWorkItem(worker, o);
        }

        sync.SignalAndWait();
Joseph Kingry
+1  A: 

Check out my blog post for comparison of various techniques:

http://zvolkov.com/blog/post/2009/07/10/Better-ways-to-wait-for-queued-threads-to-complete.aspx

zvolkov
+1  A: 

I have been using the new Parallel task library in CTP here:

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });
Brad Culberson