tags:

views:

106

answers:

2

I am trying to parallelise a code using scala actors. That is my first real code with actors, but I have some experience with Java Mulithreading and MPI in C. However I am completely lost.

The workflow I want to realise is a circular pipeline and can be described as the following:

  • Each worker actor has a reference to another one, thus forming a circle
  • There is a coordinator actor which can trigger a computation by sending a StartWork() message
  • When a worker receives a StartWork() message, it process some stuff locally and sends DoWork(...) message to its neighbour in the circle.
  • The neighbours do some other stuff and sends in turn a DoWork(...) message to its own neighbour.
  • This continues until the initial worker receives a DoWork() message.
  • The coordinator can send a GetResult() message to the initial worker and wait for a reply.

The point is that the coordinator should only receive a result when data is ready. How can a worker wait that the job returned to it before answering the GetResult() message ?

To speed up computation, any worker can receive a StartWork() at any time.

Here is my first try pseudo-implementation of the worker:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
    case GetResult() => reply( ready )
  }
}

On the coordinator side:

worker ! StartWork()
val result = worker !? GetResult() // should wait
+3  A: 

Firstly, you clearly need to have some identifier of what constitutes a single piece of work, so that the GetResult can get the correct result. I guess the obvious solution is to have your actors keep a Map of the results and a Map of any waiting getters:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() {
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   }
}

Note: the use of if in the matching condition can make message processing a bit clearer

oxbow_lakes
Thank you for the answer. I'll try it as soon as possible. BTW the I think the if after the `=>` was correct in this case. I am not looking for a guard when matching params, but I would like to have two different behaviours according to a value. Perhaps I should use two `case` entries whith different guards.
paradigmatic
Oh, yes. So it was - I was reading the `=>` to be somewhere else
oxbow_lakes
+1  A: 

One alternative would be this:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
         react {
           case GetResult() => reply( ready )
         }
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
  }
}

After the work has finished, this worker will be stuck until it receives the GetResult message. On the other hand, the coordinator can immediately send the GetResult, as it will remain in the mailbox until the worker receives it.

Daniel
Really nice. I did not realise that react blocks can be embedded. However, that's not a solution for my problem because (if I understand correctly) the worker will be stuck in the inner `react` waiting for a `GetResult()` and will not be able to be part of the pipeline.
paradigmatic
@paradigmatic Only if the result is ready will it stop to wait for `GetResult`, but my point was really showing that you could cascade reactions.
Daniel