I came up with the following implementation, but I'm not sure if it handles all corner cases (and I'm not entirely sure I'm using the BeanManagedTransaction correctly). Also, if you think this syncronization problem could be handled in a simpler way, point me to the right direction.
@Service(objectName=Sync.EjbName)
@Management(SyncMgt.class)
@TransactionManagement(value=TransactionManagementType.BEAN)
public class SyncSvc implements SyncMgt {
@PersistenceContext
protected EntityManager entityManager_;
@Resource
protected UserTransaction utx_;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
private boolean update(SyncRecord sr, String owner) {
Date stamp = (owner != null) ? new Date() : null;
Query q;
if (sr.getOwner() != null) {
q = entityManager_.createQuery("UPDATE SyncRecord sr SET sr.owner = :newOwner, sr.stamp = :stamp WHERE sr.id = :id AND sr.owner = :origOwner AND sr.stamp = :origStamp");
q.setParameter("origOwner", sr.getOwner());
q.setParameter("origStamp", sr.getStamp()); // make it fail if someone refreshed in the meantime
}
else {
q = entityManager_.createQuery("UPDATE SyncRecord sr SET sr.owner = :newOwner, sr.stamp = :stamp WHERE sr.id = :id AND sr.owner IS NULL");
}
q.setParameter("id", sr.getId());
q.setParameter("newOwner", owner);
q.setParameter("stamp", stamp);
int res = q.executeUpdate();
if (res != 1) {
return false;
}
return true;
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
private boolean tryLockImpl(String id, long maxAge, String owner) {
SyncRecord sr = entityManager_.find(SyncRecord.class, id);
if (sr == null) {
// no record yet, create one
sr = new SyncRecord(id, owner);
sr.touch();
entityManager_.persist(sr);
entityManager_.flush();
return true;
}
// found a SyncRecord, let's see who owns it
if (owner.equals(sr.getOwner())) {
// log some warning, re-locking old lock, should use refresh instead
return update(sr, owner);
}
if (sr.getOwner() == null) {
// sr is not held by anyone, safe to grab it
return update(sr, owner);
}
// someone else holds it, let's check the age
if (maxAge >= 0) {
long maxAgeStamp = System.currentTimeMillis() - maxAge;
if (sr.getStamp().getTime() < maxAgeStamp) {
if (update(sr, owner)) {
return true;
}
return false;
}
}
return false;
}
// Sync impl:
/**
* Try to lock "id" for "owner"
* If the lock is held by someone else, but is older than maxAge, break it
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public boolean tryLock(String id, long maxAge, String owner) {
if (id == null)
throw new IllegalArgumentException("id is null");
try {
utx_.begin();
if (tryLockImpl(id, maxAge, owner)) {
utx_.commit();
return true;
}
}
catch (EntityExistsException e) {
// failed to lock, someone beat us to it
}
catch (Throwable e) {
// some fishy error, raise alarm, log, etc
}
try {
utx_.rollback();
}
catch (Throwable e) {
// log the error, not much else we can do at this point
}
return false;
}
/**
* Refresh lock "id" belonging to "owner" (update its stamp)
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public boolean refresh(String id, String owner) {
if (id == null)
throw new IllegalArgumentException("id is null");
try {
utx_.begin();
SyncRecord sr = entityManager_.find(SyncRecord.class, id);
if (sr == null || !owner.equals(sr.getOwner())) {
utx_.rollback();
return false;
}
if (update(sr, owner)) {
utx_.commit();
return true;
}
}
catch (Throwable e) {
// some fishy error, raise alarm, log, etc
}
try {
utx_.rollback();
}
catch (Throwable e) {
// log the error, not much else we can do at this point
}
return false;
}
/**
* release lock "id" held by "owner"
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void release(String id, String owner) {
if (id == null)
throw new IllegalArgumentException("id is null");
try {
utx_.begin();
SyncRecord sr = entityManager_.find(SyncRecord.class, id);
if (sr == null || !owner.equals(sr.getOwner())) {
// we don't own it
utx_.rollback();
return;
}
if (update(sr, null)) {
utx_.commit();
return;
}
}
catch (Throwable e) {
// some fishy error, raise alarm, log, etc
}
try {
utx_.rollback();
}
catch (Throwable e) {
// log the error, not much else we can do at this point
}
}
// LifeCycle impl:
public void start() {}
public void stop() {}
}