views:

195

answers:

1

I'm working on a service that runs in a java app server farm that needs to do some periodic jobs (say, once every 2 minutes). The service must interface to external entities, and it is necessary to synchronize the different instances so that only one of them works on the job at a given time. Since the service uses a DB during this job, I thought of implementing the synchronization based on a simple DB table:

id, owner, stamp

where id is the lock's id, owner is the current owner and stamp is the time it was locked.

The methods would be:

tryLock(id, maxAge, owner) - to try to lock a record or break an old record
refresh(id, owner) - to update the stamp to signal we're still around working on the job
release(id, owner) - to release the lock

How would you implement this?

Edit: removed my implementation, I'll post it as an "answer"

A: 

I came up with the following implementation, but I'm not sure if it handles all corner cases (and I'm not entirely sure I'm using the BeanManagedTransaction correctly). Also, if you think this syncronization problem could be handled in a simpler way, point me to the right direction.

@Service(objectName=Sync.EjbName)
@Management(SyncMgt.class)
@TransactionManagement(value=TransactionManagementType.BEAN)
public class SyncSvc implements SyncMgt {

    @PersistenceContext
    protected EntityManager entityManager_;
    @Resource
    protected UserTransaction utx_;

    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    private boolean update(SyncRecord sr, String owner) {
        Date stamp = (owner != null) ? new Date() : null;
        Query q;
        if (sr.getOwner() != null) {
            q = entityManager_.createQuery("UPDATE SyncRecord sr SET sr.owner = :newOwner, sr.stamp = :stamp WHERE sr.id = :id AND sr.owner = :origOwner AND sr.stamp = :origStamp");
            q.setParameter("origOwner", sr.getOwner());
            q.setParameter("origStamp", sr.getStamp()); // make it fail if someone refreshed in the meantime
        }
        else {
            q = entityManager_.createQuery("UPDATE SyncRecord sr SET sr.owner = :newOwner, sr.stamp = :stamp WHERE sr.id = :id AND sr.owner IS NULL");
        }
        q.setParameter("id", sr.getId());
        q.setParameter("newOwner", owner);
        q.setParameter("stamp", stamp);
        int res = q.executeUpdate();
        if (res != 1) {
            return false;
        }
        return true;
    }

    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    private boolean tryLockImpl(String id, long maxAge, String owner) {
        SyncRecord sr = entityManager_.find(SyncRecord.class, id);
        if (sr == null) {
            // no record yet, create one
            sr = new SyncRecord(id, owner);
            sr.touch();
            entityManager_.persist(sr);
            entityManager_.flush();
            return true;
        }
        // found a SyncRecord, let's see who owns it
        if (owner.equals(sr.getOwner())) {
            // log some warning, re-locking old lock, should use refresh instead
            return update(sr, owner);
        }
        if (sr.getOwner() == null) {
            // sr is not held by anyone, safe to grab it
            return update(sr, owner);
        }
        // someone else holds it, let's check the age
        if (maxAge >= 0) {
            long maxAgeStamp = System.currentTimeMillis() - maxAge;
            if (sr.getStamp().getTime() < maxAgeStamp) {
                if (update(sr, owner)) {
                    return true;
                }
                return false;
            }
        }
        return false;
    }

    // Sync impl: 

    /**
    * Try to lock "id" for "owner"
    * If the lock is held by someone else, but is older than maxAge, break it
    */
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public boolean tryLock(String id, long maxAge, String owner) {
        if (id == null)
            throw new IllegalArgumentException("id is null");
        try {
            utx_.begin();
            if (tryLockImpl(id, maxAge, owner)) {
                utx_.commit();
                return true;
            }
        }
        catch (EntityExistsException e) {
            // failed to lock, someone beat us to it
        }
        catch (Throwable e) {
            // some fishy error, raise alarm, log, etc
        }
        try {
            utx_.rollback();
        }
        catch (Throwable e) {
            // log the error, not much else we can do at this point
        }
        return false;
    }

    /**
    * Refresh lock "id" belonging to "owner" (update its stamp)
    */
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public boolean refresh(String id, String owner) {
        if (id == null)
            throw new IllegalArgumentException("id is null");
        try {
            utx_.begin();
            SyncRecord sr = entityManager_.find(SyncRecord.class, id);
            if (sr == null || !owner.equals(sr.getOwner())) {
                utx_.rollback();
                return false;
            }
            if (update(sr, owner)) {
                utx_.commit();
                return true;
            }
        }
        catch (Throwable e) {
            // some fishy error, raise alarm, log, etc
        }
        try {
            utx_.rollback();
        }
        catch (Throwable e) {
            // log the error, not much else we can do at this point
        }
        return false;
    }

    /**
    * release lock "id" held by "owner"
    */
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void release(String id, String owner) {
        if (id == null)
            throw new IllegalArgumentException("id is null");
        try {
            utx_.begin();
            SyncRecord sr = entityManager_.find(SyncRecord.class, id);
            if (sr == null || !owner.equals(sr.getOwner())) {
                // we don't own it
                utx_.rollback();
                return;
            }
            if (update(sr, null)) {
                utx_.commit();
                return;
            }
        }
        catch (Throwable e) {
            // some fishy error, raise alarm, log, etc
        }
        try {
            utx_.rollback();
        }
        catch (Throwable e) {
            // log the error, not much else we can do at this point
        }
    }

    // LifeCycle impl:

    public void start() {}
    public void stop() {}

}
mitchnull