views:

57

answers:

3

I've been considering a general purpose generic / cancellable interface for asynchronous request / responses. The requirements are as follows, it must:

  • Support asynchronous calls
  • Be cancellable
  • Be generic
  • Support request / response
  • Support either returning in the current thread or processing the response in another response

So here's my first stab:

interface AsyncOperation<INPUT, OUTPUT> {
    Future<OUTPUT> execute(INPUT in, AsyncCallback<OUTPUT> callback);
}

interface AsyncCallback<OUTPUT> {
    void done(OUTPUT output);
}

Usage:

// completely async operation
operation.execute("Test", new AsyncCallback<String> {
    public void done(String output) {
        // process result...
    }
});

// sync operation with cancellation after timeout
Future<String> future = operation.execute("Test", null);
try {
    String result = future.get(1000);
} catch(TimeoutException ex) {
    future.cancel();
}

Disadvantages

  • It's complicated
  • It only supports a single request parameter -- not too concerned about this one
  • The single 'done' means that exceptions have to be communicated through the 'done', this could be solved by having an onSuccess and onException (and onFinally?) in AsyncCallback but it would make it even more verbose

For some context, the Google Protocol Buffers service methods follow a relatively similar model:

void [methodname](RpcController controller, 
    [RequestClass] request, RpcCallback<[ResponseClass]> callback);

Any better ideas?

A: 

Do you need the INPUT type parameter? Wouldn't it be easier of the operation objects held the input as state, as in:

void greet(final String name) {
    new AsyncOperation<Object>() {
        @Override Object doIt() {
            System.out.println("Hello " + name + "!");
        }
    }.execute(null);
}

That way, a caller can pass as many parameters as he likes in a type safe manner.

Also, invoking a callback and returning a future seems a strange use. Are you sure you need that? You could provide two execute methods, one returning the future and another invoking the callback.

meriton
You're right, taking in a callback and returning a future is a little odd but it's to support the cancellation use case as well as the case where you may want to say wait for 5 seconds, if the operation is done return the result of the operation, if not, just return that it is 'busy processing'. The callback then does the actual processing of the result.
jamie mccrindle
Cunning plan to get rid of the input parameters restriction. The use case I have in mind is to be able to create generic implementations of AsyncOperation that do throttling / limit the number of threads used etc.
jamie mccrindle
A: 

Take a look at jetlang. It supports async operations and request-response model. Here is an example from their tests:

@Test
public void simpleRequestResponse() throws InterruptedException {
    Fiber req = new ThreadFiber();
    Fiber reply = new ThreadFiber();
    req.start();
    reply.start();
    RequestChannel<String, Integer> channel = new MemoryRequestChannel<String, Integer>();
    Callback<Request<String, Integer>> onReq = new Callback<Request<String, Integer>>() {
        public void onMessage(Request<String, Integer> message) {
            assertEquals("hello", message.getRequest());
            message.reply(1);
        }
    };
    channel.subscribe(reply, onReq);

    final CountDownLatch done = new CountDownLatch(1);
    Callback<Integer> onReply = new Callback<Integer>() {
        public void onMessage(Integer message) {
            assertEquals(1, message.intValue());
            done.countDown();
        }
    };
    AsyncRequest.withOneReply(req, channel, "hello", onReply);
    assertTrue(done.await(10, TimeUnit.SECONDS));
    req.dispose();
    reply.dispose();
}
abhin4v
Cool. Thanks, will check it out. I like the Actor model. It does seem somewhat cumbersome for request / response and cancellation would need to be added in on top of it?
jamie mccrindle
Jetlang does not provide an actor model, although one can be built on the top of the channels, callbacks and request-response system it provides.
abhin4v
Ah, ok, just looked again now and I see what you mean. I need to take a closer look at Jetlang.
jamie mccrindle
A: 

Ok, sucks to answer your own question but I've come up with a set of classes that works quite well for this:

// the main Async operation interface
public interface AsyncOperation<OUTPUT, INPUT> {
    public AsyncController execute(INPUT input, 
        AsyncCallback<OUTPUT> callback);
}

// the callback that gets called when the operation completes
public interface AsyncCallback<OUTPUT> {
    public void done(AsyncResult<OUTPUT> output);
}

// provides the ability to cancel operations
public interface AsyncController {
    void cancel();
}

// this provides a convenient way to manage a response that is either a
// value or an exception
public class AsyncResult<VALUE> {

    private final VALUE value;
    private final Throwable throwable;

    private AsyncResult(VALUE value, Throwable throwable) {
        this.value = value;
        this.throwable = throwable;
    }

    public AsyncResult(VALUE value) {
        this(value, null);
    }

    public AsyncResult(Throwable throwable) {
        this((VALUE) null, throwable);
    }

    public VALUE value() throws Throwable {
        if(throwable != null) {
            throw throwable;
        } else {
            return value;
        }
    }
}
jamie mccrindle