views:

90

answers:

3

I have to work with some legacy code written in a "push" callback style. The architecture looks like this (pseudo-Java)

public interface Callback
{
    public void consumeData(Object data);
}

public class Worker // The legacy class
{
    private Callback cb;
    public Worker(..., Callback cb)
    {
        this.cb = cb;
        ...
    }
    public void execute()
    {
        ...
        for (each output object) cb.consumeData(obj);
        ...
    }
}

public class Consumer implements Callback
{
    public void doSomething()
    {
        Worker w = new Worker(..., this);
        w.execute();
        ...
    }
    public void consumeData(Object data)
    {
        // process one data object
    }
}

The problem with this is the deep coupling between the producer and consumer.

I want to write a thin adapter that will encapsulate the producer and run it in a separate thread, providing a simple Iterator based interface, essentially hasNext() and next() methods. I intend to use a SynchronousQueue: the producer will put() objects and the adapter will take() them on each next() call. SynchronousQueue will handle all the synchronization for me and is a good fit for this problem. While the happy path is conceptually simple, I can see that there will be tricky issues around termination and exception handling.

For example, upon return from w.execute(), which means the producer has queued the last data object, the producer thread must ensure that it waits until the consumer has retrieved the last object from the SynchronousQueue. There will also be cleanup issues if either the producer or consumer terminates abnormally.

Rather than reinvent the wheel I thought I'd ask here if someone has already worked out the best approach. I'm not looking for code, but a pointer to a discussion of the issues... and no, this is not homework.

A: 

Have the producer write a "poison" object into the queue's wrapper object if the producer terminates for any reason. Have the hasNext() method of the wrapper test for the existence of the poison object.

Steve Emmerson
I'm already doing that to handle end-of-data. SynchronousQueue#put() can throw InterruptedException. What happens in that case? Clearly there has to be a retry strategy, and it's not clear to me how to achieve completely deterministic behavior in the face of multiple interruptions. Maybe this is an instance of the halting problem?
Jim Garrison
According to Brian Getz's "Java Concurrency in Practice", an interrupted thread should probably terminate -- so you needn't retry.
Steve Emmerson
+1  A: 

The coupling seems to lie more in the fact that the consumer knows what producer to create, you can also put this knowledge one lever higher, in essence setting up a production chain.

Why mimic an iterator when the queue metaphore maps onto the situation better?

One issue to take into account is how long do you allow the work queue to become. If you don't add a maximum size, your producer might use more memory than expected if it produces much faster than the consumer consumes.

If you create a new work queue class, you can decide later to add more consumers or producers so it would scale better using a queue in between producer and consumer.

rsp
Indeed - the only problematic coupling I see is that the consumer instantiates the producer ("Worker") itself, rather than having a producer handed to it via the constructor, or a setter. Create a Producer interface, have the Worker implement it, and add a setProducer(Producer) method to Consumer, or add a Producer parameter to a Consumer constructor. Then these classes no longer know about each other's implementations.
Ladlestein
@Ladlestein - you should make this an answer.
Skip Head
A: 

I found the terms a bit confusing... I mean the class naming (Consumer for example)

How about a ThreadPool like this:


// This is a separate class, but you could replace it by an anonymous class
public class Task implements Runnable
{
   private Object data;
   public Task(Object data)
   {
      this.data = data;
   }
   public void run()
   {
      // here what the consumeData used to do
   }
}

public class Worker // The legacy class
{
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(...); // static or not...

public Worker(...) // maybe pass a TaskFactory if you need one
{
    ...
}
public void execute()
{
    ...
    for (each output object) 
       executor.execute(new Task(object));

    executor.awaitTermination(...);
}

}

// Just skip the 'Consumer' and call execute() directly on Worker

Nicolas
Thanks for the reply. As I commented on my question, I oversimplified the situation.
Jim Garrison