views:

787

answers:

3

I have the following code:

var items = new List<string> {"1", "2", "3"}; // 200 items
foreach(var item in items) {
  ThreadPool.QueueUserWorkItem((DoWork), item);
}

private void DoWork(object obj)
{
  lock(this)
  {
    using(var sw = File.AppendText(@"C:\somepath.txt")
    {
      sw.WriteLine(obj);
    }
  }
}

Because of the threading, for some reason, I get a random number of the 200 items written out to the file. 60 or 127 or sometimes only 3. If I remove the ThreadPool and just write inside the original foreach loop, all 200 items are written successfully.

Not sure why this occurs?

Thanks for your help.

+8  A: 

The following note from MSDN documentation on ThreadPool says it all:

The threads in the managed thread pool are background threads. That is, their IsBackground properties are true. This means that a ThreadPool thread will not keep an application running after all foreground threads have exited.

Your application simply exits (by reaching end of Main) before your threads finish running.

Pavel Minaev
One way to fix this is to use Interlocked.Increment to keep track of how many times DoWork is called, setting an auto-reset event when it hits the magic number. The main thread can wait on that event instead of shutting down.
Steven Sudit
Isn't there a way to tell main thread to wait till all the ThreadPool threads are done with the work?
shahkalpesh
Steven's comment explains how to do that.
Pavel Minaev
+2  A: 

This is a simple version of what I was alluding to. It uses a single event and does not poll or spin, and it's written so as to be reusable as well as allowing multiple work sets at the same time. The lambda expressions could be factored out, if that's more convenient for debugging.

class Program
{
    static void Main(string[] args)
    {
        var items = new string[] { "1", "2", "3", "300" };
        using (var outfile = File.AppendText("file.txt"))
        {
            using (var ws = new WorkSet<string>(x =>
                    { lock (outfile) outfile.WriteLine(x); }))
                foreach (var item in items)
                    ws.Process(item);
        }
    }

    public class WorkSet<T> : IDisposable
    {
        #region Interface

        public WorkSet(Action<T> action)
        { _action = action; }

        public void Process(T item)
        {
            Interlocked.Increment(ref _workItems);
            ThreadPool.QueueUserWorkItem(o =>
                    { try { _action((T)o); } finally { Done(); } }, item);
        }

        #endregion
        #region Advanced
        public bool Done()
        {
            if (Interlocked.Decrement(ref _workItems) != 0)
                return false;

            _finished.Set();
            return true;
        }

        public ManualResetEvent Finished
        { get { return _finished; } }

        #endregion
        #region IDisposable

        public void Dispose()
        {
            Done();
            _finished.WaitOne();
        }

        #endregion
        #region Fields

        readonly Action<T> _action;
        readonly ManualResetEvent _finished = new ManualResetEvent(false);
        int _workItems = 1;

        #endregion
    }
}
Steven Sudit
A: 

How about short and sweet?

    static int wrkThreads = 0;
    static readonly EventWaitHandle exit = new ManualResetEvent(false);
    static readonly object syncLock = new object();

    static void Main( string[] items )
    {
        wrkThreads = items.Length;

        foreach ( var item in items )
            ThreadPool.QueueUserWorkItem(( DoWork ), item);

        exit.WaitOne();
    }

    static void DoWork( object obj )
    {
        lock ( syncLock ) {
            /* Do your file work here */
        }
        if ( Interlocked.Decrement(ref wrkThreads) == 0 )
            exit.Set();
    }
Right, this is the algorithm I posted, only stripping out all reusability. It also requires that you know how many work items there'll be, in advance, whereas the solution I offered does not. However, it does refute Spencer's suggestion that this algorithm is itself long.
Steven Sudit
That it is. Reason I posted it is that considering the question, the OP is not yet big on multi-threading and this short version solves his problem while clearly demonstrating a rudimentary thread synchronization method.
I understand your motivation, but I'm not wild about the fact that you use a generic object to lock on instead of the FileStream itself.
Steven Sudit
I'm not wild that you use locally scoped object to lock on either. Matters little as long as it leads to predictable outcome.
It's not local; the FileStream is scoped into the anonymous delegate through a closure. Since the inner using block does not end until all threads do, the FileStream in the outer using block is guaranteed to be alive the whole time.
Steven Sudit
If you try to access this file elsewhere (or even the same method) before completion and disposal you're likely to get an exception instead of say waiting for the writer thread to finish. This lock is only valid for access within the delegate and will be globally effective only if you can guarantee synchronous access to the Main(args) method (which you could of course but that's not the point). So yes, it is local.
I'm not sure I understand. It's not local in that it's shared across all instances of the anonymous method. If someone else tries to open the file for write or append (whether from inside my process or not), I would expect it to fail because my HFILE has exclusive access. The purpose of the lock is to prevent simultaneous access within my application, among these worker threads, not to act as a cross-process synchronization method. I suppose a worker thread could pass the instance to a method that caches it and spawns a new thread that does not honor the lock, but so what?
Steven Sudit
So nothing really, I was merely responding to your comment and pointing out that the implementation will depend on the application.
Ok, I can't argue with that.
Steven Sudit