I suggest you take a look at ScalaQuery, which does about the same thing. And it can do so, because this is a monad problem. In fact, some Haskell solutions such as Arrows, which are implemented by the Scalaz library, seems to be pretty close.
That would be the best solution, as the proper abstraction will make changes easier in the future.
As a hack, I figure something like this:
abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate
class Query(title: String) {
self =>
// Create actors
def createActor(qm: QueryModifiers): Actor = {
val actor = qm match {
case Consolidate => // create a consolidator actor
case //... as needed
}
actor.start
actor
}
// The pipeline
val pipe: List[List[QueryModifiers]] = Nil
// Build the pipeline
def ->(qms: List[QueryModifiers]) = new Query(title) {
override val pipe = qms :: self.pipe
}
def ->(qm: QueryModifiers) = new Query(title) {
override val pipe = List(qm) :: self.pipe
}
def ->(c: Consolidate.type) = {
// Define the full pipeline
// Because the way pipe is built, the last layer comes first, and the first comes last
val pipeline = Consolidate :: pipe
// Create an actor for every QueryModifier, using an unspecified createActor function
val actors = pipeline map (_ map (createActor(_))
// We have a list of lists of actors now, where the first element of the list
// was the last QueryModifiers we received; so, group the layers by two, and for each
// pair, make the second element send the result to the first.
// Since each layer can contain many actors, make each member of the second
// layer send the results to each member of the first layer.
// The actors should be expecting to receive message SendResultsTo at any time.
for {
List(nextLayer, previousLayer) <- actors.iterator sliding 2
nextActor <- nextLayer
previousActor <- previousLayer
} previousActor ! SendResultsTo(nextActor)
// Send the query to the first layer
for ( firstActor <- actors.last ) firstActor ! Query(title)
// Get the result from the last layer, which is the consolidator
val results = actors.head.head !? Results
// Return the results
results
}
}
EDIT
You can guarantee ordering too, with a bit of a trick. I'm trying to avoid Scala 2.8 here, though it can make this much easier with named and default parameters.
sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers
class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {
// Build the pipeline
def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
}
// Do similarly for qm: NextQM
// Consolidation
def ->(qm: Consolidate.type) = {
// Create Searchers actors
// Send them the Filters
// Send them Fetchers
// Create the Consolidator actor
// Send it to Searchers actors
// Send Searchers the query
// Ask Consolidator for answer
}
}
object Query {
def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}
Now, Searchers actors keep a list of filters, a list of fetchers, and the reference to the consolidator. They listen to messages informing them of these things, and for the query. For each result, they create a Filter actor for every filter in the list, send each of them the list of fetchers and the consolidator, and then send them the result.
Filter actors keep a list of fetchers and a reference to the consolidator. They listen to messages informing them of these things, and for the result of the searcher. They send their output, if any, to newly created fetcher actors, who are first informed of the consolidator.
Fetchers keep a reference to the consolidators. They listen to a message informing them of that reference, and to the result from the filter. They send their result, in turn, to the consolidator.
The consolidator listen to two messages. One message, coming from fetcher actors, inform them of results, which they accumulate. Another message, coming from the Query, asks for that result, which it returns.
The only thing left is devising a way to let the consolidator know that all results have been processed. One way would be the following:
- In the Query, inform the Consolidator actor of every Searcher that was created. The consolidator keeps a list of them, with a flag indicating whether they are finished or not.
- Each searcher keeps a list of the filters it created, and waits for a "done" message from them. When a searcher has no processing left to do and has received "done" from all filters, it sends a message to the consolidator informing it that it has finished.
- Each filter, in turn, keeps a list of fetchers it has created, and, likewise, waits for "done" messages from them. When it has finished processing, and has received "done" from all fetchers, it informs the searcher that it has done.
- It fetcher sends a "done" message to the filter that has created it when its work is completed and sent to the consolidator.
- The consolidator only listens to the message querying the result after it has received a "done" from all searchers.