views:

412

answers:

1

I was wondering if there was a way to execute very simple tasks on another thread in scala that does not have a lot of overhead?

Basically I would like to make a global 'executor' that can handle executing an arbitrary number of tasks. I can then use the executor to build up additional constructs.

Additionally it would be nice if blocking or non-blocking considerations did not have to be considered by the clients.

I know that the scala actors library is built on top of the Doug Lea FJ stuff, and also that they support to a limited degree what I am trying to accomplish. However from my understanding I will have to pre-allocate an 'Actor Pool' to accomplish.

I would like to avoid making a global thread pool for this, as from what I understand it is not all that good at fine grained parallelism.

Here is a simple example:

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

and now an example of using exec:

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

Finally to run the examples (might want to do it a few times so HotSpot can warm up):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
+8  A: 

That's what Futures was made for. Just import scala.actors.Futures._, use future to create new futures, methods like awaitAll to wait on the results for a while, apply or respond to block until the result is received, isSet to see if it's ready or not, etc.

You don't need to create a thread pool either. Or, at least, not normally you don't. Why do you think you do?

EDIT

You can't gain performance parallelizing something as simple as an integer addition, because that's even faster than a function call. Concurrency will only bring performance by avoiding time lost to blocking i/o and by using multiple CPU cores to execute tasks in parallel. In the latter case, the task must be computationally expensive enough to offset the cost of dividing the workload and merging the results.

One other reason to go for concurrency is to improve the responsiveness of the application. That's not making it faster, that's making it respond faster to the user, and one way of doing that is getting even relatively fast operations offloaded to another thread so that the threads handling what the user sees or does can be faster. But I digress.

There's a serious problem with your code:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

Or, translating into futures,

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

You might think paraAdd is doing the tasks in paralallel, but it isn't, because Range has a non-strict implementation of map (that's up to Scala 2.7; starting with Scala 2.8.0, Range is strict). You can look it up on other Scala questions. What happens is this:

  1. A range is created from 0 until hi
  2. A range projection is created from each element i of the range into a function that returns future(i+5) when called.
  3. For each element of the range projection (i => future(i+5)), the element is evaluated (foreach is strict) and then the function apply is called on it.

So, because future is not called in step 2, but only in step 3, you'll wait for each future to complete before doing the next one. You can fix it with:

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

Which will give you better performance, but never as good as a simple immediate addition. On the other hand, suppose you do this:

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

And then compare:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

You may start seeing gains (it will depend on the number of cores and processor speed).

Daniel
Cool I will look into when a get a chance, quickly here is a link to the scaladoc for the class you are talking about: http://www.scala-lang.org/docu/files/api/scala/actors/Futures$object.html Also I find it helpful to go to the source linked in that page as often the scaladocs are a little ambiguous.
King Cub
well i can't figure out how to get the stack overflow comments not to vomit on urls with $s in them past a $object to that url or select view source from this class' Scaladoc http://www.scala-lang.org/docu/files/api/scala/actors/Future.html
King Cub
Ok I looked over that sight I am not just seeing it. Can you give me the code to put into the exec body? It seems like futures don't give me what I am asking for.I would consider using a thread pool, I think I do because the overhead of creating actors swamps performing the operation.When I change the paraAdd and paraSle functions with this: def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_()) def paraSle(hi : Int) = (0 until hi) map (i=>future(sleep(i))) foreach (_())My performance is 100x or more worse than singAdd for paraAddand exactly the same for both Sle's
King Cub
That's intrinsic with concurrency and threads. Adding two values is cheapear than even a function call, so there's no way you are ever going to save time on such a small scale. There are only two ways to gain performance: by avoiding blocking calls (i/o, basically), and by using multiple CPU cores to parallelize tasks. There's an additional problem I'll discuss in an edit to my answer.
Daniel

related questions