views:

579

answers:

4

You always hear that functional code is inherently easier to parallelize than non-functional code, so I decided to write a function which does the following:

Given a input of strings, total up the number of unique characters for each string. So, given the input [ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ], our method will returns a: 6; b: 6; c: 8.

Here's what I've written:

(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
    input
    |> Seq.fold (fun acc text ->
        (* This inner loop can be processed on its own thread *)
        text
        |> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
        |> Seq.fold (fun (acc : Map<_,_>) item ->
            match acc.TryFind(item) with
            | Some(count) -> acc.Add(item, count + 1)
            | None -> acc.Add(item, 1))
            acc
        ) Map.empty

This code is ideally parallelizable, because each string in input can be processed on its own thread. Its not as straightforward as it looks since the innerloop adds items to a Map shared between all of the inputs.

I'd like the inner loop factored out into its own thread, and I don't want to use any mutable state. How would I re-write this function using an Async workflow?

A: 
Charlie Martin
There's a little bit of weird dependency that may not be evident to a non-F#'er: the Map<'a, 'b> is basically an immutable dictionary. When you add an item to the Map, it creates returns a brand new Map instance containing the item. In other words, changes in one thread are completely (...)
Juliet
(...) isolated from all other threads. Each thread can return its own Map<'a, 'b> object, then I can sum up the results at the end. I'm not sure if that'll have better performance than the non-parallel version, and I'm not even really sure how to implement it in F#.
Juliet
+2  A: 

As already pointed out, there's update contention if you try to have different threads process different input strings, since each thread can increment the count of every letter. You can have each thread produce its own Map, and then 'add up all the Maps', but that final step may be expensive (and is not as well-suited to utilizing threads due to the shared data). I think large inputs are likely to run faster using an algorithm like the one below, where each thread processes a different letter-to-count (for all strings in the input). As a result, each thread has its own independent counter, so no update contention and no final step to combine the results. However we need preprocessing to discover the 'set of unique letters', and this step does have the same contention problem. (In practice, you probably know the universe of characters up front, e.g. alphabetics, and then can just creates 26 threads to process a-z, and bypass this issue.) In any case, presumably the question is mostly about exploring 'how to write F# async code to divide work across threads', so the code below demonstrates it.

#light

let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]

// first discover all unique letters used
let Letters str = 
    str |> Seq.fold (fun set c -> Set.add c set) Set.empty 
let allLetters = 
    input |> Array.map (fun str -> 
        async { return Letters str })
    |> Async.Parallel 
    |> Async.Run     
    |> Set.union_all // note, this step is single-threaded, 
        // if input has many strings, can improve this

// Now count each letter on a separate thread
let CountLetter letter =
    let mutable count = 0
    for str in input do
        for c in str do
            if letter = c then
                count <- count + 1
    letter, count
let result = 
    allLetters |> Seq.map (fun c ->
        async { return CountLetter c })
    |> Async.Parallel 
    |> Async.Run

// print results
for letter,count in result do
    printfn "%c : %d" letter count

I have indeed 'completely changed the algorithm', mostly because I the original algorithm you had is not particularly suitable to direct data parallelization due to the update contention. Depending on exactly what you're out to learn, this answer may or may not be particularly satisfactory to you.

Brian
Brian, is that the same algorithm I outlined? It would seem as if.
Charlie Martin
I'm not sure that the Letters method is more efficient than the serial approach I take for large inputs (strings containing a few 1000 characters each). Since you asked, I'm not really out to learn specific, just writing exploratory code :)
Juliet
In the end, you were correct: the original code was written in a way that was inherently unparallelizable, and the only way to tease apart the code apart for proper parallelization was to rewrite in a similar way that you've done above.
Juliet
+1  A: 

Parallel is not the same as async, as Don Syme explains.

So IMO you'd be better off using PLINQ to parallelize.

Mauricio Scheffer
+3  A: 

You can write that like this:

let wordFrequency =
  Seq.concat >> Seq.filter System.Char.IsLetter >> Seq.countBy id >> Map.ofSeq

and parallelize it with only two extra characters to use the PSeq module from the FSharp.PowerPack.Parallel.Seq DLL instead of the ordinary Seq module:

let wordFrequency =
  Seq.concat >> PSeq.filter System.Char.IsLetter >> PSeq.countBy id >> Map.ofSeq

For example, the time taken to compute frequencies from the 5.5Mb King James bible falls from 4.75s to 0.66s. That is a 7.2× speedup on this 8-core machine.

Jon Harrop