views:

391

answers:

8

I'm writing a simple app (for my wife no less :-P ) that does some image manipulation (resizing, timestamping etc) for a potentially large batch of images. So I'm writing a library that can do this both synchronously and asynchronously. I decided to use the Event-based Asynchronous Pattern. When using this pattern, you need to raise an event when the work has been completed. This is where I'm having problems knowing when it's done. So basically, in my DownsizeAsync method (async method for downsizing images) I'm doing something like this:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

The tricky part now is knowing when they are all complete.

Here's what I've considered: Using ManualResetEvents like here: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx But the problem I came across is that you can only wait for 64 or less events. I may have many many more images.

Second option: Have a counter that counts the images that have been done, and raise the event when the count reaches the total:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

Now this feels "hacky" and I'm not entirely sure if that's thread safe.

So, my question is, what's the best way of doing this? Is there another way to synchronize all threads? Should I not be using a ThreadPool? Thanks!!

UPDATE Based on feedback in the comments and from a few answers I've decided to take this approach:

First, I created an extension method that batches an enumerable into "batches":

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

Basically, if you do something like this:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

You get this output:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

The idea being that (as someone in the comments pointed out) there's no need to create a separate thread for each image. Therefore, I'll batch the images into [machine.cores * 2] number of batches. Then, I'll use my second approach which is simply to keep a counter going and when the counter reaches the total I'm expecting, I'll know I'm done.

The reason I'm convinced now that it is in fact thread safe, is because I've marked the total variable as volatile which according to MSDN:

The volatile modifier is usually used for a field that is accessed by multiple threads without using the lock statement to serialize access. Using the volatile modifier ensures that one thread retrieves the most up-to-date value written by another thread

means I should be in the clear (if not, please let me know!!)

So here's the code I'm going with:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

I'm open to feedback as I'm in no way an expert on multithreading, so if anyone sees any issue with this, or has a better idea, please let me know. (Yes, this is just a home made app, but I have some ideas on how I can use the knowledge I gain here to improve our Search / Index service we use at work.) For now I'll keep this question open till I feel like I'm using the right approach. Thanks everyone for your help.

+11  A: 

The simplest thing would be to create new threads, and then call Thread.Join on each of them. You could use a semaphore or something like it - but it's probably easier to just create new threads.

In .NET 4.0 you could use Parallel Extensions to do this quite easily with tasks.

As another alternative which would use the threadpool, you could create a delegate and call BeginInvoke on it, to return an IAsyncResult - you can then get the WaitHandle for each result via the AsyncWaitHandle property, and call WaitHandle.WaitAll.

EDIT: As pointed out in comments, you can only call WaitAll with up to 64 handles at a time on some implementations. Alternatives could be calling WaitOne on each of them in turn, or calling WaitAll with batches. It won't really matter, so long as you're doing it from a thread which isn't going to block the threadpool. Also note that you can't call WaitAll from an STA thread.

Jon Skeet
Jon- you are forgetting about the 64 limit on WaitAll.
RichardOD
@RichardOD: Thanks, have added a note about that.
Jon Skeet
+2  A: 

.Net 4.0 makes multi-threading even easier (although you can still shoot yourself with side effects).

Hamish Grubijan
Even easier? Multithreading isn't easy!
RichardOD
Ok ... multi-threading in C is harder. Theoretically Turing-equivalent or what have you, conceptually similar if not the same, but the extra lines of code do get in the way of understanding the core.
Hamish Grubijan
+2  A: 

I've used SmartThreadPool with much succes to cope with this problem. There is also a Codeplex site about de assembly.

SmartThreadPool can help with other problems as well like some threads cannot run at te same time while others can.

Jochen
+1  A: 

Another option would be to use a Pipe.

You post all the work to be done to the pipe and then read data from the pipe from each thread. When the pipe is empty, you're done, threads ends themselves and everybody is happy (of course make sure you first produce all the work, then consume it)

Jorge Córdoba
+9  A: 

You still want to use the ThreadPool because it will manage the number of threads it runs simultaneously. I ran into a similar issue recently and solved it like this:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();

The IDispatcher and IJob look like this:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}

And then used a decorator to chunk the use of ThreadPool:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}

The IDispatcher abstraction works pretty well for swapping out your threading technique. I have another implementation that is a SingleThreadedDispatcher and you could make a ThreadStart version like Jon Skeet suggested. Then it's easy to run each one and see what kind of performance you get. The SingleThreadedDispatcher is good when debugging your code or when you don't want to kill the processor on your box.

Edit: I forgot to add the code for ThreadPoolWorker:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}
Michael Valenty
+2  A: 

I use a static utility method to examine all the individual wait handles..

    public static void WaitAll(WaitHandle[] handles)
    {
        if (handles == null)
            throw new ArgumentNullException("handles",
                "WaitHandle[] handles was null");
        foreach (WaitHandle wh in handles) wh.WaitOne();
    }

Then in my main thread, I create a List of these wait handles, and for each delegate I put in my ThreadPool Queue, I add the wait handle to the List...

 List<WaitHandle> waitHndls = new List<WaitHandle>();
 foreach (iterator logic )
 {
      ManualResetEvent txEvnt = new ManualResetEvent(false);

      ThreadPool.QueueUserWorkItem(
           delegate
               {
                   try { // Code to process each task... }
                   // Finally, set each wait handle when done
                   finally { lock (locker) txEvnt.Set(); } 
               });
      waitHndls.Add(txEvnt);  // Add wait handle to List
 }
 util.WaitAll(waitHndls.ToArray());   // Check all wait Handles in List
Charles Bretana
+5  A: 

The simplest and efficient solution would be to use the counters and make them thread safe. This would consume less memory and can scale up to higher number of threads

Here is a sample

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");
Pratap .R
+1. I use this pattern, though I check the result of Interlocked.Decrement() to see if we've hit zero and set an event if so to indicate that all the items are completed. That way you don't need to poll on itemCount.
Curt Nichols
I like this approach, but I wonder if this is needed if the variable we're incrementing is marked as volatile.
BFree
volatile makes sure the variable is not cached and that the accessors are atomic, but it is not guaranteed that the full operation of read+change+write will be atomic
Pratap .R
@BFree. With your original approach you did total++, which equates to total = total + 1. Now think about multiple threads doing that. Pratap is spot on- volatile doesn't make things thread safe, it's all about cache coherency.
RichardOD
+1  A: 

I suggest putting the untouched images in a queue and as you read from the queue launch a thread and insert its System.Threading.Thread.ManagedThreadId property into a dictionary along with the file name. This way your UI can list both pending and active files.

When each thread completes it invokes a callback routine, passing back its ManagedThreadId. This callback (passed as a delegate to the thread) removes the thread's id from the dictionary, launches another thread from the queue, and updates the UI.

When both the queue and the dictionary are empty, you're done.

Slightly more complicated but this way you get a responsive UI, you can easily control the number of active threads, and you can see what's in flight. Collect statistics. Get fancy with WPF and put up progress bars for each file. She can't help but be impressed.

ebpower