views:

1026

answers:

5

I have an iteration vals: Iterable[T] and a long-running function without any relevant side effects: f: (T => Unit). Right now this is applied to vals in the obvious way:

vals.foreach(f)

I would like the calls to f to be done concurrently (within reasonable limits). Is there an obvious function somewhere in the Scala base library? Something like:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

While f is reasonably long running, it is short enough that I don't want the overhead of invoking a thread for each call, so I am looking for something based on a thread pool.

+3  A: 

I had some issues using scala.actors.Futures in Scala 2.8 (it was buggy when I checked). Using java libs directly worked for me, though:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}
wrang-wrang
+2  A: 

I'd use scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t)))
Daniel
+6  A: 

I like the Futures answer. However, while it will execute concurrently, it will also return asynchronously, which is probably not what you want. The correct approach would be as follows:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }
Daniel Spiewak
Be careful that `vals` is a strict collection -- if it's lazy (and in Scala 2.7 this includes the `Range` class), the futures won't be created until each one is needed by `foreach`, and nothing will happen in parallel.
Ken Bloom
I suppose we could solve that problem by injecting another `foreach` call between the `map` and the current `foreach`. Thus: `vals map { x => future { f(x) } } foreach { x => x } foreach { _() }`
Daniel Spiewak
That would be a map we have to inject, not another foreach? And it is not clear to me that the map of a lazy collection is strict. The safest way may be to call toArray.
David Crawshaw
You're right, `foreach` was (obviously) the wrong thing to inject since it returns `Unit`. My bad! :-)The `map` function on lazy collections is almost always non-strict, so we can either call `toList` (or `toArray`), or we can project and then force: `(vals map { x => future { f(x) } } projection).force foreach { _() }`. I don't know whether that's *better* than simply `toList`, but it is certainly different.
Daniel Spiewak
+7  A: 

Scalaz has parMap. You would use it as follows:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

This will equip every functor (including Iterable) with a parMap method, so you can just do:

vals.parMap(f)

You also get parFlatMap, parZipWith, etc.

Apocalisp
+2  A: 

The latest release of Functional Java has some higher-order concurrency features that you can use.

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

And then...

par.parMap(vals, f)

Remember to shutdown the pool.

Apocalisp