views:

66

answers:

3

Having some issue with Threadpooling here that I need some help with please. I am trying to write a Generator, and I need to allow users generate up to 10,000 lines with the code below. Problem with this is the line

WaitHandle.WaitAll(doneEvents);

Can only handle 64 WaitAll at a time, How can I best apply thread pooling to my code in this case?

 public void GenerateInsertStatements(int iRequiredRows)
        {
            // One event is used for each row object
            ManualResetEvent[] doneEvents = new ManualResetEvent[iRequiredRows];

            Row[] rows = new Row[iRequiredRows];
            for (int i = 0; i < iRequiredRows; i++)
            {
                doneEvents[i] = new ManualResetEvent(false);
                Row row = new Row(this.Name, this.TableColumns, doneEvents[i]);
                rows[i] = row;
                ThreadPool.QueueUserWorkItem(row.ThreadPoolCallback, i);
            }

            WaitHandle.WaitAll(doneEvents);


            using (sr = new StreamWriter(this.Name + ".sql"))
            {
                for(int i=0; i<rows.Length; i++)
                {
                    WriteStatementToFile(i, rows[i].GeneratedInsertStatement);
                }
            }
        }

Thanks in advance

A: 

Probably not the most efficient solution, but it should work regardless of the 64 wait handles limit :

for(int i = 0; i < iRequiredRows; i++)
    doneEvents[i].WaitOne();
Thomas Levesque
DO you have a better idea of how I could do the above please? I am just starting out figuring how best to use threadpooling for the solution to allow for up to 10,000 lines to be generated.
Kobojunkie
Well, if I had a better idea I would have posted it in the first place ;). Gonzalo's solution seems good too, I honestly don't know which one is the best...
Thomas Levesque
+1  A: 

I would use just one WaitHandle and one int. Like:

int done_when_zero; // This is a field of the class
ManualResetEvent evt = new ManualResetEvent (false); // Field
...
done_when_zero = iRequiredRows; // This goes before the loop
...
evt.WaitOne (); // this goes after the loop
evt.Reset (); // Prepare for next execution if needed

And then, at the end of ThreadPoolCallback:

if (Interlocked.Decrement (ref done_when_zero)) <= 0)
    evt.Set ();
Gonzalo
A: 

As it was already suggested using a counter and a single ManualResetEvent should work fine for you. Below is ThreadPoolWait class taken from .NET Matters: ThreadPoolWait and HandleLeakTracer (see Figure 3 Better Implementation of ThreadPoolWait for more info)

public class ThreadPoolWait : IDisposable
{
    private int _remainingWorkItems = 1;
    private ManualResetEvent _done = new ManualResetEvent(false);

    public void QueueUserWorkItem(WaitCallback callback)
    {
        QueueUserWorkItem(callback, null);
    }

    public void QueueUserWorkItem(WaitCallback callback, object state)
    {
        ThrowIfDisposed();
        QueuedCallback qc = new QueuedCallback();
        qc.Callback = callback;
        qc.State = state;
        lock (_done) _remainingWorkItems++;
        ThreadPool.QueueUserWorkItem(new WaitCallback(HandleWorkItem), qc);
    }

    public bool WaitOne() { return WaitOne(-1, false); }

    public bool WaitOne(TimeSpan timeout, bool exitContext)
    {
        return WaitOne((int)timeout.TotalMilliseconds, exitContext);
    }

    public bool WaitOne(int millisecondsTimeout, bool exitContext)
    {
        ThrowIfDisposed();
        DoneWorkItem();
        bool rv = _done.WaitOne(millisecondsTimeout, exitContext);
        lock (_done)
        {
            if (rv)
            {
                _remainingWorkItems = 1;
                _done.Reset();
            }
            else _remainingWorkItems++;
        }
        return rv;
    }

    private void HandleWorkItem(object state)
    {
        QueuedCallback qc = (QueuedCallback)state;
        try { qc.Callback(qc.State); }
        finally { DoneWorkItem(); }
    }

    private void DoneWorkItem()
    {
        lock (_done)
        {
            --_remainingWorkItems;
            if (_remainingWorkItems == 0) _done.Set();
        }
    }

    private class QueuedCallback
    {
        public WaitCallback Callback;
        public object State;
    }

    private void ThrowIfDisposed()
    {
        if (_done == null) throw new ObjectDisposedException(GetType().Name);
    }

    public void Dispose()
    {
        if (_done != null)
        {
            ((IDisposable)_done).Dispose();
            _done = null;
        }
    }
}
serge_gubenko