views:

278

answers:

3

I've just begun to explore the PTL and have a design question.

My Scenario: I have a list of URLs that each refer to an image. I want each image to be downloaded in parallel. As soon as at least one image is downloaded, I want to execute a method that does something with the downloaded image. That method should NOT be parallelized -- it should be serial.

I think the following will work but I'm not sure if this is the right way to do it. Because I have separate classes for collecting the images and for doing "something" with the collected images, I end up passing around an array of Tasks which seems wrong since it exposes the inner workings of how images are retrieved. But I don't know a way around it. In reality there is more to both of these methods but that's not important for this. Just know that they really shouldn't be lumped into one large method that both retrieves and does something with the image.

//From the Director class
Task<Image>[] downloadTasks = collector.RetrieveImages(listOfURLs);

for (int i = 0; i < listOfURLs.Count; i++)
{
    //Wait for any of the remaining downloads to complete
    int completedIndex = Task<Image>.WaitAny(downloadTasks);
    Image completedImage = downloadTasks[completedIndex].Result;

    //Now do something with the image (this "something" must happen serially)
    //Uses the "Formatter" class to accomplish this let's say
}

///////////////////////////////////////////////////

//From the Collector class
public Task<Image>[] RetrieveImages(List<string> urls)
{
    Task<Image>[] tasks = new Task<Image>[urls.Count];

    int index = 0;
    foreach (string url in urls)
    {
        string lambdaVar = url;  //Required... Bleh
        tasks[index] = Task<Image>.Factory.StartNew(() =>
            {
                using (WebClient client = new WebClient())
                {
                    //TODO: Replace with live image locations
                    string fileName = String.Format("{0}.png", i);
                    client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName));
                }

                return Image.FromFile(Path.Combine(Application.StartupPath, fileName));
            },
            TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent);

        index++;
    }

    return tasks;
}
A: 

The best way to do this would probably be by implementing the Observer pattern: have your RetreiveImages function implement IObservable, put your "completed image action" into an IObserver object's OnNext method, and subscribe it to RetreiveImages.

I haven't tried this myself yet (still have to play more with the task library) but I think this is the "right" way to do it.

tzaman
I think it might be because I'm trying to learn the PTL and Observer pattern at the same time but I can't seem to get it right. If I have the Collector class (shown), the Director class (top code snippet), and the Formatter class (the thing that does "Something"), how would I implement this?
colithium
+1  A: 

TPL already provides the ContinueWith function to execute one task when another finishes. Task chaining is one of the main patterns used in TPL for asynchronous operations.

The following method downloads a set of images and continues by renaming each of the files

static void DownloadInParallel(string[] urls)
{
   var tempFolder = Path.GetTempPath();

   var downloads = from url in urls
                   select Task.Factory.StartNew<string>(() =>{
                       using (var client = new WebClient())
                       {
                           var uri = new Uri(url);
                           string file = Path.Combine(tempFolder,uri.Segments.Last());
                           client.DownloadFile(uri, file);
                           return file;
                       }
                   },TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent)
                  .ContinueWith(t=>{
                       var filePath = t.Result;
                       File.Move(filePath, filePath + ".test");
                  },TaskContinuationOptions.ExecuteSynchronously);

    var results = downloads.ToArray();
    Task.WaitAll(results);
}

You should also check the WebClient Async Tasks from the ParallelExtensionsExtras samples. The DownloadXXXTask extension methods handle both the creation of tasks and the asynchronous downloading of files.

The following method uses the DownloadDataTask extension to get the image's data and rotate it before saving it to disk

static void DownloadInParallel2(string[] urls)
{
    var tempFolder = Path.GetTempPath();

    var downloads = from url in urls
         let uri=new Uri(url)
         let filePath=Path.Combine(tempFolder,uri.Segments.Last())
         select new WebClient().DownloadDataTask(uri)                                                        
         .ContinueWith(t=>{
            var img = Image.FromStream(new MemoryStream(t.Result));
            img.RotateFlip(RotateFlipType.RotateNoneFlipY);
            img.Save(filePath);
         },TaskContinuationOptions.ExecuteSynchronously);

    var results = downloads.ToArray();
    Task.WaitAll(results);
}
Panagiotis Kanavos
Two things, I don't think TaskContinuationOptions.ExecuteSynchronously does what I need. That "something" (moving files in your example) can't happen at the same time on more than one thread. Let's pretend that instead of moving file it is communicating with a device through a serial cable. Two, like I said there's more to it than what I simplified it to. I don't think it's proper to have the two tasks combined into the same method. But that forces me to pass around Tasks which seems like a bad pattern.
colithium
What you call a bad pattern is the actual design philosophy of the TPL. It is actually pretty close to F# in this matter. Second, ExecuteSynchronously means that the continuation will run using the same thread as the task that precedes it. Finally, you are NOT combining tasks into the same method. The lambda passed to the task or the continuation is yet another anonymous function. You could easily pass a method name instead of using a lambda. If you find the way TPL works uncomfortable you should probably look to a different library or pattern instead of trying to work against it.
Panagiotis Kanavos
And that very well may be the answer. If that's the case you'll get your up vote back and accepted.As to my two points though I think we're having a breakdown of communication.1) ExecuteSynchronously means that the continuation may be running on SEVERAL threads at the same time. The continuation must ONLY be running on ONE thread at any given time in my situation.2) I know it's technically its own method and I could drop in a method call as well. But the two things are actually in two completely separate classes that really shouldn't even know about each other. And thus my dilemma.
colithium
+3  A: 

Typically you use WaitAny to wait for one task when you don't care about the results of any of the others. For example if you just cared about the first image that happened to get returned.

How about this instead.

This creates two tasks, one which loads images and adds them to a blocking collection. The second task waits on the collection and processes any images added to the queue. When all the images are loaded the first task closes the queue down so the second task can shut down.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.IO;
using System.Net;
using System.Threading.Tasks;

namespace ClassLibrary1
{
    public class Class1
    {
        readonly string _path = Directory.GetCurrentDirectory();

        public void Demo()
        {
            IList<string> listOfUrls = new List<string>();
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/editicon.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/favorite-star-on.gif");
            listOfUrls.Add("http://i3.codeplex.com/Images/v16821/arrow_dsc_green.gif");

            BlockingCollection<Image> images = new BlockingCollection<Image>();

            Parallel.Invoke(
                () =>                   // Task 1: load the images
                {
                    Parallel.For(0, listOfUrls.Count, (i) =>
                        {
                            Image img = RetrieveImages(listOfUrls[i], i);
                            img.Tag = i;
                            images.Add(img);    // Add each image to the queue
                        });
                    images.CompleteAdding();    // Done with images.
                },
                () =>                   // Task 2: Process images serially
                {
                    foreach (var img in images.GetConsumingEnumerable())
                    {
                        string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag));
                        Console.WriteLine("Rotating image {0}", img.Tag);
                        img.RotateFlip(RotateFlipType.RotateNoneFlipXY);

                        img.Save(newPath);
                    }
                });
        }

        public Image RetrieveImages(string url, int i)
        {
            using (WebClient client = new WebClient())
            {
                string fileName = Path.Combine(_path, String.Format("{0}.png", i));
                Console.WriteLine("Downloading {0}...", url);
                client.DownloadFile(url, Path.Combine(_path, fileName));
                Console.WriteLine("Saving {0} as {1}.", url, fileName);
                return Image.FromFile(Path.Combine(_path, fileName));
            }
        } 
    }
}

WARNING: The code doesn't have any error checking or cancelation. It's late and you need something to do right? :)

This is an example of the pipeline pattern. It assumes that getting an image is pretty slow and that the cost of locking inside the blocking collection isn't going to cause a problem because it happens relatively infrequently compared to the time spent downloading images.

Our book... You can read more about this and other patterns for parallel programming at http://parallelpatterns.codeplex.com/ Chapter 7 covers pipelines and the accompanying examples show pipelines with error handling and cancellation.

Ade Miller