views:

464

answers:

4

I am planning to use this schema in my application, but I was not sure whether this is safe.

To give a little background, a bunch of servers will compute results of sub-tasks that belong to a single task and report them back to the central server. This piece of code is used to register the results, and also check whether all the subtasks for the task has completed and if so, report that fact only once.

The important point is that, all task must be reported once and only once as soon as it is completed (all subTaskResults are set).

Can anybody help? Thank you! (Also, if you have a better idea to solve this problem, please let me know!)

*Note that I simplified the code for brevity.

Solution I

class Task {
    //Populate with bunch of (Long, new AtomicReference()) pairs
    //Actual app uses read only HashMap
    Map<Id, AtomicReference<SubTaskResult>> subtasks = populatedMap();

    Semaphore permission = new Semaphore(1);

    public Task set(id, subTaskResult){
           //null check omitted
           subtasks.get(id).set(result);
           return check() ? this : null;
    }

    private boolean check(){
          for(AtomicReference ref : subtasks){
              if(ref.get()==null){
                  return false;
              }
          }//for
          return permission.tryAquire();
    }

  }//class

Stephen C kindly suggested to use a counter. Actually, I have considered that once, but I reasoned that the JVM could reorder the operations and thus, a thread can observe a decremented counter (by another thread) before the result is set in AtomicReference (by that other thread).

*EDIT: I now see this is thread safe. I'll go with this solution. Thanks, Stephen!

Solution II

class Task {
    //Populate with bunch of (Long, new AtomicReference()) pairs
    //Actual app uses read only HashMap
    Map<Id, AtomicReference<SubTaskResult>> subtasks = populatedMap();
    AtomicInteger counter = new AtomicInteger(subtasks.size());

    public Task set(id, subTaskResult){
           //null check omitted
           subtasks.get(id).set(result);
           //In the actual app, if !compareAndSet(null, result) return null;
           return check() ? this : null;
    }

    private boolean check(){
           return counter.decrementAndGet() == 0;
    }

  }//class
A: 

The atomicity guaranteed (per class documentation) explicitly for AtomicReference.compareAndSet extends to set and get methods (per package documentation), so in that regard your code appears to be thread-safe.

I am not sure, however, why you have Semaphore.tryAquire as a side-effect there, but without complimentary code to release the semaphore, that part of your code looks wrong.

Noel Ang
I reasoned that, for example, 2 threads could complete the final remaining 2 subtask before calling check(). If this happens, Task would be reported twice if there were no semaphore. Permission is released once in the construction of Task object.
Enno Shioji
Yes, the Semaphore (or some other synchronization) is definitely required to prevent the Task being reported as complete more than once.
Stephen C
+3  A: 

I assume that your use-case is that there are multiple multiple threads calling set, but for any given value of id, the set method will be called once only. I'm also assuming that populateMap creates the entries for all used id values, and that subtasks and permission are really private.

If so, I think that the code is thread-safe.

Each thread should see the initialized state of the subtasks Map, complete with all keys and all AtomicReference references. This state never changes, so subtasks.get(id) will always give the right reference. The set(result) call operates on an AtomicReference, so the subsequent get() method calls in check() will give the most up-to-date values ... in all threads. Any potential races with multiple threads calling check seem to sort themselves out.

However, this is a rather complicated solution. A simpler solution would be to use an concurrent counter; e.g. replace the Semaphore with an AtomicInteger and use decrementAndGet instead of repeatedly scanning the subtasks map in check.


In response to this comment in the updated solution:

Actually, I have considered that once, but I reasoned that the JVM could reorder the operations and thus, a thread can observe a decremented counter (by another thread) before the result is set in AtomicReference (by that other thread).

The AtomicInteger and AtomicReference by definition are atomic. Any thread that tries to access one is guaranteed to see the "current" value at the time of the access.

In this particular case, each thread calls set on the relevant AtomicReference before it calls decrementAndGet on the AtomicInteger. This cannot be reordered. Actions performed by a thread are performed in order. And since these are atomic actions, the efects will be visible to other threads in order as well.

In other words, it should be thread-safe ... AFAIK.

Stephen C
Thank you! Yes, your assumptions are correct. About using an AtomicInteger, I once considered using it but then I concluded that it's not a good solution.. (see the edited question for a detailed explanation). Or maybe I'm missing something?
Enno Shioji
Let me understand.. The authors of "Java concurrency in practice" state that: "There is no guarantee that operations in one thread will be performed in the order given by the program, as long as the reordering is not detectable from within that thread even if the reordering is apparent to other threads." In this case, it seems that the reordering is indeed not detectable within one thread. I understand operations on AtomicVariables are atomic, but the operation (set() and check()) are not atomic, right?
Enno Shioji
Looks like I wasn't understanding java memory model very well.. Thanks a lot!
Enno Shioji
A: 

The second solution does provide a thread-safe latch, but it's vulnerable to calls to set() that provide an ID that's not in the map -- which would trigger a NullPointerException -- or more than one call to set() with the same ID. The latter would mistakenly decrement the counter too many times and falsely report completion when there are presumably other subtasks IDs for which no result has been submitted. My criticism isn't with regard to the thread safety, but rather to the invariant maintenance; the same flaw would be present even without the thread-related concern.

Another way to solve this problem is with AbstractQueuedSynchronizer, but it's somewhat gratuitous: you can implement a stripped-down counting semaphore, where each call set() would call releaseShared(), decrementing the counter via a spin on compareAndSetState(), and tryAcquireShared() would only succeed when the count is zero. That's more or less what you implemented above with the AtomicInteger, but you'd be reusing a facility that offers more capabilities you can use for other portions of your design.


To flesh out the AbstractQueuedSynchronizer-based solution requires adding one more operation to justify the complexity: being able to wait on the results from all the subtasks to come back, such that the entire task is complete. That's Task#awaitCompletion() and Task#awaitCompletion(long, TimeUnit) in the code below.

Again, it's possibly overkill, but I'll share it for the purpose of discussion.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;


final class Task
{
  private static final class Sync extends AbstractQueuedSynchronizer
  {
    public Sync(int count)
    {
      setState(count);
    }


    @Override
    protected int tryAcquireShared(int ignored)
    {
      return 0 == getState() ? 1 : -1;
    }


    @Override
    protected boolean tryReleaseShared(int ignored)
    {
      int current;
      do
      {
        current = getState();
        if (0 == current)
          return true;
      }
      while (!compareAndSetState(current, current - 1));
      return 1 == current;
    }
  }


  public Task(int count)
  {
    if (count < 0)
      throw new IllegalArgumentException();
    sync_ = new Sync(count);
  }


  public boolean set(int id, Object result)
  {
    // Ensure that "id" refers to an incomplete task. Doing so requires
    // additional synchronization over the structure mapping subtask 
    // identifiers to results.
    // Store result somehow.
    return sync_.releaseShared(1);
  }


  public void awaitCompletion()
    throws InterruptedException
  {
    sync_.acquireSharedInterruptibly(0);
  }


  public void awaitCompletion(long time, TimeUnit unit)
    throws InterruptedException
  {
    sync_.tryAcquireSharedNanos(0, unit.toNanos(time));
  }


  private final Sync sync_;
}
seh
@seh - lets just assume that the rest of the OP's application is capable of maintaining these invariants. He is clearly aware of them, even if they are not explicitly stated; see his comments on my answer.
Stephen C
@seh - yeah, it's a stripped version so the actual app. will deal with the problems you raised.As for using AQS, I don't really understand this thing. Looks pretty complicated..
Enno Shioji
@seh - thanks for sharing! A concurrency expert recommended me to use the Phaser (new concurrency library in Java7) backport version, so I might go with that, but now I know how to build my own synchronizer :)
Enno Shioji
A: 

I have a weird feeling reading your example program, but it depends on the larger structure of your program what to do about that. A set function that also checks for completion is almost a code smell. :-) Just a few ideas.

If you have synchronous communication with your servers you might use an ExecutorService with the same number of threads like the number of servers that do the communication. From this you get a bunch of Futures, and you can naturally proceed with your calculation - the get calls will block at the moment the result is needed but not yet there.

If you have asynchronous communication with the servers you might also use a CountDownLatch after submitting the task to the servers. The await call blocks the main thread until the completion of all subtasks, and other threads can receive the results and call countdown on each received result.

With all these methods you don't need special threadsafety measures other than that the concurrent storing of the results in your structure is threadsafe. And I bet there are even better patterns for this.

hstoerr
Thanks for the idea! Couldn't understand though how to use an ExecutorService here though. Could you explain a little bit more?
Enno Shioji
If you, say, communicate with 10 compute servers by webservice, you could use a ThreadPoolExecutor with exactly 10 threads as ExecutorService. Each one communicates with exactly one of the servers. You submit your computation tasks as Callable that does the webservice call and returns the result. The ExecutorService returns a Future on submitting the callable, and you can check later for the calculated value, or just use it with get() - the Future will block automatically until the value is calculated.
hstoerr