views:

511

answers:

4

As in my own answer to my own question, I have the situation whereby I am processing a large number of events which arrive on a queue. Each event is handled in exactly the same manner and each even can be handled independently of all other events.

My program takes advantage of the Scala concurrency framework and many of the processes involved are modelled as Actors. As Actors process their messages sequentially, they are not well-suited to this particular problem (even though my other actors are performing actions which are sequential). As I want Scala to "control" all thread creation (which I assume is the point of it having a concurrency system in the first place) it seems I have 2 choices:

  1. Send the events to a pool of event processors, which I control
  2. get my Actor to process them concurrently by some other mechanism

I would have thought that #1 negates the point of using the actors subsystem: how many processor actors should I create? being one obvious question. These things are supposedly hidden from me and solved by the subsystem.

My answer was to do the following:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Is there a better approach? Is this incorrect?

edit: A possibly better approach is:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
+2  A: 

If the events can all be handled independently, why are they on a queue? Knowing nothing else about your design, this seems like an unnecessary step. If you could compose the process function with whatever is firing those events, you could potentially obviate the queue.

An actor essentially is a concurrent effect equipped with a queue. If you want to process multiple messages simultaneously, you don't really want an actor. You just want a function (Any => ()) to be scheduled for execution at some convenient time.

Having said that, your approach is reasonable if you want to stay within the actors library and if the event queue is not within your control.

Scalaz makes a distinction between Actors and concurrent Effects. While its Actor is very light-weight, scalaz.concurrent.Effect is lighter still. Here's your code roughly translated to the Scalaz library:

val eventProcessor = effect (x => process x)

This is with the latest trunk head, not yet released.

Apocalisp
Thanks! They are on a "queue" purely because I'm sending them to an actor and an actor has a queue, which it processes sequentially. As the actors library is how I'm _supposed_ to handle concurrency (*) in Scala, I'm trying to use it. Otherwise I'd just use ExecutorService.invokeAll.
oxbow_lakes
See also my comment to jschen above. I've been writing concurrent code in Java for a long time and trying to find the correct boundary between using actors and, um, not using actors in a scala program which is expected to be concurrent.
oxbow_lakes
Actors are not a panacea, and there's nothing that says you have to use actors if you want concurrency in Scala. It's just a library and, in my opinion, an overly complicated one.
Apocalisp
+1  A: 

This sounds like a simple consumer/producer problem. I'd use a queue with a pool of consumers. You could probably write this with a few lines of code using java.util.concurrent.

jshen
The whole point of using the scala actors library is that it can better map your code (written using actors) onto the concurrency available in the current operating environment. So if Scala thinks it's got 4 processors, perhaps it will create a backing thread pool for its actors with 4 workers. I gain nothing by creating my own separate thread pool to execute this work on - all I'll end up with is a load of unnecessary context switching.I'm perfectly aware of how to solve this in Java - I'm asking about how to solve it using the Scala actors library, hence the tags.
oxbow_lakes
At least I assume that's the whole point of using it :-/
oxbow_lakes
Sorry, I didn't realize this was an academic exercise with actors. I thought you wanted a good solution to the problem."So if Scala thinks it's got 4 processors, perhaps it will create a backing thread pool for its actors with 4 workers."This is maybe two lines of code using java.util.concurrent which you can easily use from scala. I use it from jruby all the time.
jshen
Yes, great. But then when I deploy my program onto a 50-core beast, I've gone and hardcoded 2 threads into it. Or at the very least, it renders the way the actor subsystem works entirely pointless because it now doesn't know how many threads it can create. It's not an academic exercise, it's about understanding how to best solve a real world problem with a technology which I'm new to.
oxbow_lakes
It is academic if there is a very good solution that you aren't interested in because you want to experiment with some new idea. This is the definition of academic! I never suggested you hardcode the number of threads. Here's a simplified line from a jruby program I am currently working on. JavaConcurrent::Executors.newFixedThreadPool(JavaLang::Runtime.getRuntime.availableProcessors)
jshen
+1  A: 

The purpose of an actor (well, one of them) is to ensure that the state within the actor can only be accessed by a single thread at a time. If the processing of a message doesn't depend on any mutable state within the actor, then it would probably be more appropriate to just submit a task to a scheduler or a thread pool to process. The extra abstraction that the actor provides is actually getting in your way.

There are convenient methods in scala.actors.Scheduler for this, or you could use an Executor from java.util.concurrent.

Erik Engbrecht
+3  A: 

This seems like a duplicate of another question. So I'll duplicate my answer

Actors process one message at a time. The classic pattern to process multiple messages is to have one coordinator actor front for a pool of consumer actors. If you use react then the consumer pool can be large but will still only use a small number of JVM threads. Here's an example where I create a pool of 10 consumers and one coordinator to front for them.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

This code tests to see which consumer is available and sends a request to that consumer. Alternatives are to just randomly assign to consumers or to use a round robin scheduler.

Depending on what you are doing, you might be better served with Scala's Futures. For instance, if you don't really need actors then all of the above machinery could be written as

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
James Iry
Thanks - I wasn't aware that you could invoke tasks on the same scheduler that the actor framework uses. I think the best approach is therefore to use Scheduler.execute(process(e))
oxbow_lakes
Also - yes; it's a very similar question (which I link to) but not quite the same. The first question was "are actors sequential?" whereas the second question was "As actors are sequential, how do I do X"
oxbow_lakes
@James, doesn't this code result in a huge number of `Ready` messages accumulating? If there are `n` consumers, won't `n` `Ready` messages eventually be sent to the coordinator for each `Request`?
Scott Morrison
Incidentally: `0 to 10` contains 11 elements, not 10.
Scott Morrison