views:

171

answers:

1
+6  Q: 

React for futures

I am trying to use a divide-and-conquer (aka fork/join) approach for a number crunching problem. Here is the code:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

It runs (with a nice speed-up) but the the future apply method seems to block a thread and the thread pool increases tremendously. And when too many threads are created, the computations is stucked.

Is there a kind of react method for futures which releases the thread ? Or any other way to achieve that behavior ?

EDIT: I am using scala 2.8.0.final

+4  A: 

Don't claim (apply) your Futures, since this forces them to block and wait for an answer; as you've seen this can lead to deadlocks. Instead, use them monadically to tell them what to do when they complete. Instead of:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Try this:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

The result of this will be a Responder[Result] (essentially a Future[Result]) containing the merged results; you can do something effectful with this final value using respond() or foreach(), or you can map() or flatMap() it to another Responder[T]. No blocking necessary, just keep scheduling computations for the future!

Edit 1:

Ok, the signature of the compute function is going to have to change to Responder[Result] now, so how does that affect the recursive calls? Let's try this:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Now you no longer need to wrap the calls to compute with future(...) because they're already returning Responder (a superclass of Future).

Edit 2:

One upshot of using this continuation-passing style is that your top-level code--whatever calls compute originally--doesn't block at all any more. If it's being called from main(), and that's all the program does, this will be a problem, because now it will just spawn a bunch of futures and then immediately shut down, having finished everything it was told to do. What you need to do is block on all these futures, but only once, at the top level, and only on the results of all the computations, not any intermediate ones.

Unfortunately, this Responder thing that's being returned by compute() no longer has a blocking apply() method like the Future did. I'm not sure why flatMapping Futures produces a generic Responder instead of a Future; this seems like an API mistake. But in any case, you should be able to make your own:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

So now you can create a blocking call to compute in your main method like so:

val finalResult = claim(compute(input))
pelotom
I don't think this helps. This just pushes back the problem another level; now it'll deadlock in the merges--well, it might, depending on how you rewrite merge to work with `Responder`s.
Rex Kerr
Why do you say that? Keep in mind that the merge is happening in a new thread that's only spawned after `f1` and `f2` are both finished computing.
pelotom
the merge is not operating on `Responders`, it's operating on `Result` values. Its code doesn't need to change at all.
pelotom
@pelotom - Maybe I'm not understanding what you envision happens inside "merge". The code you wrote doesn't compile for me with a trivial example (where the OP's code does compile, with appropriate additions).
Rex Kerr
@Rex, I have no idea what `merge` does; it's a black box to me :) All I know is its signature, `(Result, Result) => Result` is unchanged, and if it worked before it will still work...
pelotom
@Rex I never got the OP's code to compile since there's so much missing, but this should compile as a trivial example of monadic futures: `for (i <- future(5); j <- future(6)) yield (i + j)`
pelotom
@pelotom - But now compute's return type is Responder[Result]. And those are getting wrapped into futures in the recursive call. So either recursion seems to be broken, or we don't seem to have gotten anywhere.
Rex Kerr
@Rex Kerr - You're right, the return type has to change, which means the recursion needs to be modified. See my edit.
pelotom
I tried your approach (BEFORE THE EDIT). It compiles fine but when it runs, it crashes instantaneously without any stacktrace. I just get a 'scala.actors.SuspendActorControl' one line eon console and a non zero return code...
paradigmatic
@paradigmatic - yikes, that sounds bad. and after the edit?
pelotom
@pelotom - It crashes still but at least I have a stack trace. `scala.actors.ActorProxy@641a034d: caught java.lang.InterruptedException`
paradigmatic
@paradigmatic: without seeing your code I can't be sure, but I would hazard a guess that you're shutting down prematurely now since all your code has gone future-y. See my second edit.
pelotom
@pelotom - Still crashing with the second edit. BTW merge() is a pure function, both args, result and all created objects are immutable and not shared among different futures/threads. The interrupted exception is reported to originate from `scala.concurrent.forkjoin.ForkJoinWorkerThread`.
paradigmatic
@paradigmatic: if you want to make a pastee with a self-contained testcase which reproduces the problem, I'll take a look at it...
pelotom
@pelotom - I did a minimal crashing toy example (producing sorted list of random ints). The culprit seems to be the claim method. Thanks to have a look: https://gist.github.com/4d6667280072a47cc9df
paradigmatic
@paradigmatic: Ok, I edited `claim()` above, please try it out. There were 2 problems: I shouldn't have been using a SynchronousQueue (that was causing the program to hang) and `respond`/`foreach` blocks need to be wrapped in an `actor` or `future` to avoid the exceptions you were seeing. Those exceptions are used by actors for control flow, and certain actor functions depend on these mechanisms to work, `respond` being one of them.
pelotom
IT WORKS. But I've lost my nice speed-up...
paradigmatic
@paradigmatic: time to crack out that profiler... but I'm afraid I've done all I can for you
pelotom
Thanks for your help. I'll try to figure out the loss of speed-up, and I'll try also the jsr166y.
paradigmatic
You might also try doing this with the monadic futures (called promises) provided by scalaz (http://code.google.com/p/scalaz/). I've had success with those, whereas my knowledge of scala's actor futures is mostly theoretical. They give you more control over the parallelization strategy used (e.g. the kind of threadpool used).
pelotom