views:

767

answers:

4

The problem

Although the code about which I will talk here I wrote in F#, it is based on the .NET 4 framework, not specifically depending on any particularity of F# (at least it seems so!).

I have some pieces of data on my disk that I should update from the network, saving the latest version to the disk:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

The problem is that to loadAndSaveAndUpdate all my data, I would have to execute the function many times:

{1 .. 5000} |> loadAndSaveAndUpdate

Each step would do

  • some disk IO,
  • some data crunching,
  • some network IO (with possibility of lots of latency),
  • more data crunching,
  • and some disk IO.

Wouldn't it be nice to have this done in parallel, to some degree? Unfortunately, none of my reading and parsing functions are "async-workflows-ready".

The first (not very good) solutions I came up with

Tasks

The first thing I've done was to set up a Task[] and start them all:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

Then I hit CTRL+ESC just to see how many threads it was using. 15, 17, ..., 35, ..., 170, ... until killed the application! Something was going wrong.

Parallel

I did almost the same thing but using Parallel.ForEach(...) and the results were the same: lots and lots and lots of threads.

A solution that works... kind of

Then I decided to start only n threads, Task.WaitAll(of them), then other n, until there were no more tasks available.

This works, but the problem is that when it has finished processing, say, n-1 tasks, it will wait, wait, wait for the damn last Task that insist on blocking due to lots of network latency. This is not good!

So, how would you attack this problem? I'd appreciate to view different solutions, involving either Async Workflows (and in this case how to adapt my non-async functions), Parallel Extensions, weird parallel patterns, etc.

Thanks.

+5  A: 

Using 'async's will enable you to do the I/O-bound work without burning threads while the various I/O calls are 'at sea', so that would be my first suggestion. It should be straightforward to convert the code to async, usually along the lines of

  • wrap each function body in async{...}, add return where necessary
  • create Async versions of any I/O primitives that aren't already in the library via Async.FromBeginEnd
  • Switch calls of the form let r = Foo() to let! r = AsyncFoo()
  • Use Async.Parallel to convert the 5000 async objects into a single Async that runs in parallel

There are various tutorials for doing this; one such webcast is here.

Brian
Brian, fantastic webcast. I've got to know Petricek when I bought a MEAP copy of his "Real World Functional Programming", a fantastic book. This webcast is just as good! Thanks!
Bruno Reis
A: 

You could always use a ThreadPool.

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

basically:

  1. Create a thread pool
  2. Set the max number of threads
  3. Queue all the tasks using QueueUserWorkItem(WaitCallback)
tster
Yeah, good'ol'ThreadPool to the rescue!
Bruno Reis
+1  A: 

Are you sure that your individual tasks are completing in a timely manner? I believe that both Parallel.ForEach and the Task class already use the .NET threadpool. Tasks should generally be short-lived work items, in which case the threadpool will only spawn a small number of actual threads, but if your tasks are not making progress and there are other tasks queued then the number of threads used will steadily increase up to the maximum (which by default is 250/processor in .NET 2.0 SP1, but is different under different versions of the framework). It's also worth noting that (at least in .NET 2.0 SP1) new thread creation is throttled to 2 new threads per second, so getting up to the number of threads you're seeing indicates that the tasks are not completing in a short amount of time (so it may not be completely accurate to pin the blame on Parallel.ForEach).

I think that Brian's suggestion to use async workflows is a good one, particularly if the source of the long-lived tasks is IO, since async will return your threads to the threadpool until the IO completes. Another option is to simply accept that your tasks aren't completing quickly and allow the spawning of many threads (which can be controlled to some extent by using System.Threading.ThreadPool.SetMaxThreads) - depending on your situation it may not be a big deal that you're using a lot of threads.

kvb
Marvellous! Very good, this is what I was looking for. Yeah, my threads were spawning at about `1 new thread per second` (not 2 -- .NET 4 beta 2) and they were blocking on `WebRequests`. I looked for something like `SetMaxThreads` but couldn't find, thanks! And finally, I didn't "accept" those lots of threads because the app was crashing, and I strongly believe that it was due to the amount of threads (since now, with being the only difference `async` it works)
Bruno Reis
+1  A: 

ParallelOptions.MaxDegreeOfParallelism limits the number of concurrent operations run by Parallel method calls

Marc Bate

related questions