views:

1600

answers:

6

I have a problem which I believe is the classic master/worker pattern, and I'm seeking advice on implementation. Here's what I currently am thinking about the problem:

There's a global "queue" of some sort, and it is a central place where "the work to be done" is kept. Presumably this queue will be managed by a kind of "master" object. Threads will be spawned to go find work to do, and when they find work to do, they'll tell the master thing (whatever that is) to "add this to the queue of work to be done".

The master, perhaps on an interval, will spawn other threads that actually perform the work to be done. Once a thread completes its work, I'd like it to notify the master that the work is finished. Then, the master can remove this work from the queue.

I've done a fair amount of thread programming in Java in the past, but it's all been prior to JDK 1.5 and consequently I am not familiar with the appropriate new APIs for handling this case. I understand that JDK7 will have fork-join, and that that might be a solution for me, but I am not able to use an early-access product in this project.

The problems, as I see them, are:

1) how to have the "threads doing the work" communicate back to the master telling them that their work is complete and that the master can now remove the work from the queue

2) how to efficiently have the master guarantee that work is only ever scheduled once. For example, let's say this queue has a million items, and it wants to tell a worker to "go do these 100 things". What's the most efficient way of guaranteeing that when it schedules work to the next worker, it gets "the next 100 things" and not "the 100 things I've already scheduled"?

3) choosing an appropriate data structure for the queue. My thinking here is that the "threads finding work to do" could potentially find the same work to do more than once, and they'd send a message to the master saying "here's work", and the master would realize that the work has already been scheduled and consequently should ignore the message. I want to ensure that I choose the right data structure such that this computation is as cheap as possible.

Traditionally, I would have done this in a database, in sort of a finite-state-machine manner, working "tasks" through from start to complete. However, in this problem, I don't want to use a database because of the high volume and volatility of the queue. In addition, I'd like to keep this as light-weight as possible. I don't want to use any app server if that can be avoided.

It is quite likely that this problem I'm describing is a common problem with a well-known name and accepted set of solutions, but I, with my lowly non-CS degree, do not know what this is called (i.e. please be gentle).

Thanks for any and all pointers.

+3  A: 

Check out java.util.concurrent in the Java library.

Depending on your application it might be as simple as cobbling together some blocking queue and a ThreadPoolExecutor.

Also, the book Java Concurrency in Practice by Brian Goetz might be helpful.

starblue
+4  A: 

First, why do you want to hold the items after a worker started doing them? Normally, you would have a queue of work and a worker takes items out of this queue. This would also solve the "how can I prevent workers from getting the same item"-problem.

To your questions:

1) how to have the "threads doing the work" communicate back to the master telling them that their work is complete and that the master can now remove the work from the queue

The master could listen to the workers using the listener/observer pattern

2) how to efficiently have the master guarantee that work is only ever scheduled once. For example, let's say this queue has a million items, and it wants to tell a worker to "go do these 100 things". What's the most efficient way of guaranteeing that when it schedules work to the next worker, it gets "the next 100 things" and not "the 100 things I've already scheduled"?

See above. I would let the workers pull the items out of the queue.

3) choosing an appropriate data structure for the queue. My thinking here is that the "threads finding work to do" could potentially find the same work to do more than once, and they'd send a message to the master saying "here's work", and the master would realize that the work has already been scheduled and consequently should ignore the message. I want to ensure that I choose the right data structure such that this computation is as cheap as possible.

There are Implementations of a blocking queue since Java 5

Tim Büthe
Thanks everyone for the response. Tim, to your first question, which is a good one: I believe I need to keep items on the queue because the "worker threads going out and finding work to do" need to know what work has already been scheduled.For a concrete example, imagine a program that has to go out and find "old files to move". Threads find them, add them to the queue. But on subsequent runs, if those files haven't been moved yet, the "finder" threads will find the same files. Make sense? More appropriate ways of dealing with that problem?Thanks again.
marc esher
May be you don't need to bother about it. There is one good quality about asynchronous systems - idempotence. System should be protected against double message processing (speaking in math f(x) should be equals f(f(x)), so system state doesn't change if one message processed twice). Your example is good example of idempotence in system. We could pass message about one particular file twice to worker and nothing bad happens. If file already moved we simply skipping task.
dotsid
You could define a work queue and beside this a in-work list. When a worker thread takes a item from the queue, you add it to the in-work list. When the worker is done, you can remove it from the in-work list. If an item is submitted as a new one, you could check if its already in the queue or in the list an ignore it.
Tim Büthe
A: 

If you are open to the idea of Spring, then check out their Spring Integration project. It gives you all the queue/thread-pool boilerplate out of the box and leaves you to focus on the business logic. Configuration is kept to a minimum using @annotations.

btw, the Goetz is very good.

Paul McKenzie
+3  A: 

As far as I understand your requirements, you need ExecutorService. ExecutorService have

submit(Callable task)

method which return value is Future. Future is a blocking way to communicate back from worker to master. You could easily expand this mechanism to work is asynchronous manner. And yes, ExecutorService also maintaining work queue like ThreadPoolExecutor. So you don't need to bother about scheduling, in most cases. java.util.concurrent package already have efficient implementations of thread safe queue (ConcurrentLinked queue - nonblocking, and LinkedBlockedQueue - blocking).

dotsid
To add to what @dotsid suggests, I would point out that this standard library does a lot of, if not all, the OP ask for and it is simple to use and it works. You can scale up to 100s or thousands of tasks without much effort.
Peter Lawrey
Thanks to everyone for the thoughtful responses. I'm not sure if this is the "canonical" answer, but in the end, after reading the Goetz book, what I ended up with looked a lot like this answer.
marc esher
+1  A: 

Don't forget Jini and Javaspaces. What you're describing sounds very like the classic producer/consumer pattern that space-based architectures excel at.

A producer will write the jobs into the space. 1 or more consumers will take out jobs (under a transaction) and work on that in parallel, and then write the results back. Since it's under a transaction, if a problem occurs the job is made available again for another consumer .

You can scale this trivially by adding more consumers. This works especially well when the consumers are separate VMs and you scale across the network.

Brian Agnew
A: 

This doesn't sound like a master-worker problem, but a specialized client above a threadpool. Given that you have a lot of scavenging threads and not a lot of processing units, it may be worthwhile simply doing a scavaging pass and then a computing pass. By storing the work items in a Set, the uniqueness constraint will remove duplicates. The second pass can submit all of the work to an ExecutorService to perform the process in parallel.

A master-worker model generally assumes that the data provider has all of the work and supplies it to the master to manage. The master controls the work execution and deals with distributed computation, time-outs, failures, retries, etc. A fork-join abstraction is a recursive rather than iterative data provider. A map-reduce abstraction is a multi-step master-worker that is useful in certain scenarios.

A good example of master-worker is for trivially parallel problems, such as finding prime numbers. Another is a data load where each entry is independant (validate, transform, stage). The need to process a known working set, handle failures, etc. is what makes a master-worker model different than a thread-pool. This is why a master must be in control and pushes the work units out, whereas a threadpool allows workers to pull work from a shared queue.

Ben Manes