If the Http connection fails (e.g. server is down), I need to decrement the counter.
I was going to say "hell yes", but I'm somewhat less certain after this sentence. I take it you want to do something like this:
def sendRequest(url)
request = new request to url
request.header["X-count"] = next serial
if request.send() != SUCCESS
rewind serial
In that case, I'd guess that two threads should not be allowed to send requests simultaneously, and then you want something that serializes requests rather than an AtomicInteger
, which really just lets you perform a few operations atomically. If two threads were to call sendRequest
simultaneously, and the first one would fail, this would happen:
Thread | What happens?
--------+-------------------------
A | Creates new request
B | Creates new request
A | Set request["X-count"] = 0
A | Increment counter to 1
A | Send request
B | Set request["X-count"] = 1
B | Increment counter to 2
B | Send request
A | Request fails
B | Request succeeds
A | Rewind counter down to 1
C | Creates new request
C | Set request["X-count"] = 1
C | Increment counter to 2
And now, you've sent two request with X-count = 1. If you want to avoid this, you should use something like (assume Request
and Response
are classes used to handle requests to URLs):
class SerialRequester {
private volatile int currentSerial = 0;
public synchronized Response sendRequest(URL url) throws SomeException {
Request request = new Request(url);
request.setHeader("X-count", currentSerial);
Response response = request.send();
if (response.isSuccess()) ++currentSerial;
return response;
}
}
This class guarantees that no two successful requests (made through the same SerialRequester
) have the same X-count value.
Edit Many seem concerned about the above solution not running concurrently. It doesn't. That's correct. But it needs to work this way to solve the OP's problem. Now, if the counter needn't be decremented when a request fails, an AtomicInteger
would be perfect, but it's incorrect in this scenario.
Edit 2 I got it in me to write a serial requester (like the one above) less prone to freezing, such that it aborts requests if they've been pending too long (i.e., queued in the worker thread but not started). Thus, if the pipes clog and one request hangs for a very very long time, other requests will wait at most a fixed amount of time, so the queue doesn't grow indefinitely until the clog goes away.
class SerialRequester {
private enum State { PENDING, STARTED, ABORTED }
private final ExecutorService executor =
Executors.newSingleThreadExecutor();
private int currentSerial = 0; // not volatile, used from executor thread only
public Response sendRequest(final URL url)
throws SomeException, InterruptedException {
final AtomicReference<State> state =
new AtomicReference<State>(State.PENDING);
Future<Response> result = executor.submit(new Callable<Response>(){
@Override
public Result call() throws SomeException {
if (!state.compareAndSet(State.PENDING, State.STARTED))
return null; // Aborted by calling thread
Request request = new Request(url);
request.setHeader("X-count", currentSerial);
Response response = request.send();
if (response.isSuccess()) ++currentSerial;
return response;
}
});
try {
try {
// Wait at most 30 secs for request to start
return result.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e){
// 30 secs passed; abort task if not started
if (state.compareAndSet(State.PENDING, State.ABORTED))
throw new SomeException("Request queued too long", e);
return result.get(); // Task started; wait for completion
}
} catch (ExecutionException e) { // Network timeout, misc I/O errors etc
throw new SomeException("Request error", e);
}
}
}