views:

349

answers:

1

Hello

I am struggling with the following problem for a week by now and need some advice.

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

I want to construct a Pipeline like:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

So far, I have implemented every Pipeline-Segment as an Actor. I need to create dedicated actor-instances for every Query, as some of those actors like filterXXX and consolidate need to maintain state per query.

Functions like askIMDB produce multiple results which I want to process concurrently (each to a seperate actor). So I have not found any way to pre-construct the whole graph of actors before executing the query() and neither an elegant way to modify it at runtime.

My first try was a chain of actors and passing sth like Transaction-IDs in the messages, so each Actor had a Map[TransactionID->State] but this felt rather ugly. The second try was to create a sort-of-Pipeline abstracting the digraph of actors into one flow but I failed so far.

This is my first post, sorry if I forgot something or the question is to general/pseudo-coded. Any advice very much appreciated. Thanks!

+3  A: 

I suggest you take a look at ScalaQuery, which does about the same thing. And it can do so, because this is a monad problem. In fact, some Haskell solutions such as Arrows, which are implemented by the Scalaz library, seems to be pretty close.

That would be the best solution, as the proper abstraction will make changes easier in the future.

As a hack, I figure something like this:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

EDIT

You can guarantee ordering too, with a bit of a trick. I'm trying to avoid Scala 2.8 here, though it can make this much easier with named and default parameters.

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

Now, Searchers actors keep a list of filters, a list of fetchers, and the reference to the consolidator. They listen to messages informing them of these things, and for the query. For each result, they create a Filter actor for every filter in the list, send each of them the list of fetchers and the consolidator, and then send them the result.

Filter actors keep a list of fetchers and a reference to the consolidator. They listen to messages informing them of these things, and for the result of the searcher. They send their output, if any, to newly created fetcher actors, who are first informed of the consolidator.

Fetchers keep a reference to the consolidators. They listen to a message informing them of that reference, and to the result from the filter. They send their result, in turn, to the consolidator.

The consolidator listen to two messages. One message, coming from fetcher actors, inform them of results, which they accumulate. Another message, coming from the Query, asks for that result, which it returns.

The only thing left is devising a way to let the consolidator know that all results have been processed. One way would be the following:

  1. In the Query, inform the Consolidator actor of every Searcher that was created. The consolidator keeps a list of them, with a flag indicating whether they are finished or not.
  2. Each searcher keeps a list of the filters it created, and waits for a "done" message from them. When a searcher has no processing left to do and has received "done" from all filters, it sends a message to the consolidator informing it that it has finished.
  3. Each filter, in turn, keeps a list of fetchers it has created, and, likewise, waits for "done" messages from them. When it has finished processing, and has received "done" from all fetchers, it informs the searcher that it has done.
  4. It fetcher sends a "done" message to the filter that has created it when its work is completed and sent to the consolidator.
  5. The consolidator only listens to the message querying the result after it has received a "done" from all searchers.
Daniel
Kudos, that is very elegant.However I wonder why you want to "make each member of the second layer send the results to each member of the first layer.".The Size of each layer is dependent on the Result-Size of the previous Layer because the next Layer is meant to process everything coming from the previous layer in parallel. The first Layer sends each Result to *one* Actor of the next Layer.So I do not know the nubmer of QueryModifiers at the time of Pipeline-construction.Thank you very much! Will play around with your solution
hotzen
Ah, I see. This is _massively_ parallel, not just pipeline-parallel. :-) In that case, instead of creating all layers, just create the first layer and send it the pipeline. Each actor, in turn, removes itself from the pipeline, creates the next layer, and send it the remaining pipeline. You may still want to keep the last layer as a singleton actor created with the first layer, so that the function can query it.
Daniel
Nice, thank you very much. I will be on chrismas-vacation by now and will then try this out.
hotzen