views:

110

answers:

1

Is there an elegant way to model a dynamic dataflow in Java? By dataflow, I mean there are various types of tasks, and these tasks can be "connected" arbitrarily, such that when a task finishes, successor tasks are executed in parallel using the finished tasks output as input, or when multiple tasks finish, their output is aggregated in a successor task (see flow-based programming). By dynamic, I mean that the type and number of successors tasks when a task finishes depends on the output of that finished task, so for example, task A may spawn task B if it has a certain output, but may spawn task C if has a different output. Another way of putting it is that each task (or set of tasks) is responsible for determining what the next tasks are.

Sample dataflow for rendering a webpage: I have as task types: file downloader, HTML/CSS renderer, HTML parser/DOM builder, image renderer, JavaScript parser, JavaScript interpreter.

  • File downloader task for HTML file
    • HTML parser/DOM builder task
      • File downloader task for each embedded file/link
        • If image, image renderer
        • If external JavaScript, JavaScript parser
          • JavaScript interpreter
        • Otherwise, just store in some var/field in HTML parser task
      • JavaScript parser for each embedded script
        • JavaScript interpreter
      • Wait for above tasks to finish, then HTML/CSS renderer (obviously not optimal or perfectly correct, but this is simple)

I'm not saying the solution needs to be some comprehensive framework (in fact, the closer to the JDK API, the better), and I absolutely don't want something as heavyweight is say Spring Web Flow or some declarative markup or other DSL.

To be more specific, I'm trying to think of a good way to model this in Java with Callables, Executors, ExecutorCompletionServices, and perhaps various synchronizer classes (like Semaphore or CountDownLatch). There are a couple use cases and requirements:

  1. Don't make any assumptions on what executor(s) the tasks will run on. In fact, to simplify, just assume there's only one executor. It can be a fixed thread pool executor, so a naive implementation can result in deadlocks (e.g. imagine a task that submits another task and then blocks until that subtask is finished, and now imagine several of these tasks using up all the threads).
  2. To simplify, assume that the data is not streamed between tasks (task output->succeeding task input) - the finishing task and succeeding task don't have to exist together, so the input data to the succeeding task will not be changed by the preceeding task (since it's already done).
  3. There are only a couple operations that the dataflow "engine" should be able to handle:
    1. A mechanism where a task can queue more tasks
    2. A mechanism whereby a successor task is not queued until all the required input tasks are finished
    3. A mechanism whereby the main thread (or other threads not managed by the executor) blocks until the flow is finished
    4. A mechanism whereby the main thread (or other threads not managed by the executor) blocks until certain tasks have finished
  4. Since the dataflow is dynamic (depends on input/state of the task), the activation of these mechanisms should occur within the task code, e.g. the code in a Callable is itself responsible for queueing more Callables.
  5. The dataflow "internals" should not be exposed to the tasks (Callables) themselves - only the operations listed above should be available to the task.
  6. Note that the type of the data is not necessarily the same for all tasks, e.g. a file download task may accept a File as input but will output a String.
  7. If a task throws an uncaught exception (indicating some fatal error requiring all dataflow processing to stop), it must propagate up to the thread that initiated the dataflow as quickly as possible and cancel all tasks (or something fancier like a fatal error handler).
  8. Tasks should be launched as soon as possible. This along with the previous requirement should preclude simple Future polling + Thread.sleep().
  9. As a bonus, I would like to dataflow engine itself to perform some action (like logging) every time task is finished or when no has finished in X time since last task has finished. Something like: ExecutorCompletionService<T> ecs; while (hasTasks()) { Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) { future.get() ... } ... }

Are there straightforward ways to do all this with Java concurrency API? Or if it's going to complex no matter what with what's available in the JDK, is there a lightweight library that satisfies the requirements? I already have a partial solution that fits my particular use case (it cheats in a way, since I'm using two executors, and just so you know, it's not related at all to the web browser example I gave above), but I'd like to see a more general purpose and elegant solution.

A: 

How about defining interface such as:

interface Task extends Callable {
  boolean isReady();
}

Your "dataflow engine" would then just need to manage a collection of Task objects i.e. allow new Task objects to be queued for excecution and allow queries as to the status of a given task (so maybe the interface above needs extending to include id and/or type). When a task completes (and when the engine starts of course) the engine must just query any unstarted tasks to see if they are now ready, and if so pass them to be run on the executor. As you mention, any logging, etc. could also be done then.

One other thing that may help is to use Guice (http://code.google.com/p/google-guice/) or a similar lightweight DI framework to help wire up all the objects correctly (e.g. to ensure that the correct executor type is created, and to make sure that Tasks that need access to the dataflow engine (either for their isReady method or for queuing other tasks, say) can be provided with an instance without introducing complex circular relationships.

HTH, but please do comment if I've missed any key aspects... Paul.

pdbartlett
Thanks for the reply.Hmm, I've come to realize that I think it would be a more proper modeling to decouple the tasks from the connection logic (what new tasks to queue and determining if all available inputs are ready). Tasks should be standalone modules, and there should be a subclass or separate object (possibly another task) that provides the dynamic connection logic.Not sure if DI (let alone a DI framework) is really necessary here - should be orthogonal to this.Let my try writing up some interfaces. I'm at work atm, so I'll get back to this in a couple hours.
Maian