views:

679

answers:

2

I have a fairly standard mechanism in Java for solving the problem:

  • Work items must be scheduled to execute at a particular time
  • Each work item must then wait on a condition becoming true
  • Work items should be cancellable

The solution I use is as follows:

  1. Have a single-threaded scheduler to schedule my work items
  2. Have an ExecutorService (which may be multi-threaded)
  3. Each scheduled work item then submits the actual work to the ExecutorService. The returned Future is cached in a map. A completion service is used to remove the future from the cache when the work is completed
  4. Items can be cancelled via the cached futures

Of course, my executor needs to be at least as big as the number of blocking work items I expect to have but this is not a problem in practice.

So now I'm coding in Scala and using the actor framework. Assuming that my work item can be encapsulated in an event sent to an actor:

  1. What mechanism would I use to schedule a work item for a specific time?
  2. If a work item is an event sent to an actor, how can I ensure that the backing thread pool is bigger than the number of items that can be blocking at the same time
  3. How can I cause a previously scheduled work item to be cancelled?
+3  A: 

What mechanism would I use to schedule a work item for a specific time?

I would use a java.util.concurrent.ScheduledExecutorService.

If a work item is an event sent to an actor, how can I ensure that the backing thread pool is bigger than the number of items that can be blocking at the same time

This strikes me as a design that defeats the effort of parallelisation. Try to minimise or eliminate blocking and global state. These are barriers to composability and scalability. For example, consider having a single dedicated thread that waits for files to arrive and then fires events off to actors. Or look at java.nio for asynchronous non-blocking I/O.

I don't fully understand your requirements here, but it seems like you could have a single thread/actor looking for I/O events. Then as your scheduled "work items", schedule effects that create non-blocking actors. Have those actors register themselves with the I/O thread/actor to receive messages about I/O events that they care about.

How can I cause a previously scheduled work item to be cancelled?

ScheduledExecutorService returns Futures. What you have is not a bad design in that regard. Collect them in a Map and call future.cancel().

Apocalisp
You are completely correct about the "blocking work" model being inappropriate and I'd pretty much come to the same conclusion myself but was putting off doing a complete re-write. However I finally got around to doing this on Friday and now have a much better system
oxbow_lakes
A: 

You could have a scheduling actor that has a list of scheduled actors, and uses Actor.receiveWithin() to wake up every second or so and send messages to actors that are ready to be executed. The scheduling actor could also handle cancelling. Another option is to let every actor handle its own scheduling directly with receiveWithin(), instead of centralizing scheduling.

There is some discussion on this issue in the blog post Simple cron like scheduler in Scala.

markusk