views:

90

answers:

4

[EDIT] This question is "how do I do atomic changes to entity beans with EJB 3 and JPA 2.0". Should be simple, right?

I tried to fix my code based on the answers I got so far. I'm using JBoss 6.0.0M2 with Hypersonic (just download it and call run.bat).

My test case: Create 3 threads and call one of the testCounterMitLock*() 500 times in a loop. So a successful test should print "Anzahl eingetragene Zeilen: 1500" (3*500).

I tried:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
        manager.lock(ct, LockModeType.WRITE);
        int wert = ct.getWert();

Obviously doesn't work because a different thread can change the value in the database before the lock is applied. So I try to fix that:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
        manager.lock(ct, LockModeType.WRITE);
        manager.refresh (ct);
        int wert = ct.getWert();

The refresh() should give me the current value and the implicit query should also make sure the object gets locked now. No such luck. Let's try with JPA 2.0:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.WRITE);
        int wert = ct.getWert();

That also doesn't work. Maybe the lock isn't enough?

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
        int wert = ct.getWert();

Uhm ... doesn't work either! One last desperate attempt:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
        manager.flush();
        manager.refresh (ct);
        int wert = ct.getWert();

Okay ... can anyone explain why nothing works? I'm out of ideas.

[EDIT2] PS: To add insult to injury, this is the last output of the last running thread:

commit/rollback: 441/62

(441+62 = 503)...

Here is the complete code. First the bean:

package server.kap15;

import java.rmi.RemoteException;

import javax.ejb.*;
import javax.persistence.*;

@Stateful
public class CounterTestBean implements CounterTestRemote, SessionSynchronization {
    @PersistenceContext(unitName = "JavaEE")
    EntityManager manager;

    private int commit = 0;

    private int rollback = 0;

    public void initDatenbank() {
        manager.createNamedQuery("CounterTest.deleteAll").executeUpdate();
        manager.createNamedQuery("TestTabelle.deleteAll").executeUpdate();
        CounterTestVersion ct = new CounterTestVersion();
        ct.setNr(1);
        ct.setVersion(1);
        ct.setWert(1);
        manager.persist(ct);
    }

    public boolean testCounterOhneLock() {
        try {
            CounterTest ct = manager.find(CounterTest.class, 1);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
            manager.lock(ct, LockModeType.WRITE);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock2() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
            manager.lock(ct, LockModeType.WRITE);
            manager.refresh (ct);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock3() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.WRITE);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock4() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock5() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
            manager.flush();
            manager.refresh (ct);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitVersion() {
        try {
            CounterTestVersion ctv = manager.find(CounterTestVersion.class, 1);
            int wert = ctv.getWert();
            ctv.setWert(wert + 1);
            manager.flush();
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (OptimisticLockException e) {
            System.out.println(">>> Versionskonflikt !");
            return false;
        } catch (Throwable t) {
            System.out.println(t.getMessage());
            return false;
        }
    }

    public long anzTestZeilen() {
        Query query = manager.createNamedQuery("TestTabelle.anzZeilen");
        Long anzahl = (Long) query.getSingleResult();
        return anzahl;
    }

    public void afterBegin() throws EJBException, RemoteException {
    }

    public void beforeCompletion() throws EJBException, RemoteException {
    }

    public void afterCompletion(boolean committed) throws EJBException,
    RemoteException {
        if (committed)
            commit++;
        else
            rollback++;
        System.out.println("commit/rollback: " + commit + "/" + rollback);
    }
}

The remote interface:

package server.kap15;

import javax.ejb.Remote;

@Remote
public interface CounterTestRemote {
    public void initDatenbank();

    public boolean testCounterOhneLock();

    public boolean testCounterMitLock();
    public boolean testCounterMitLock2();
    public boolean testCounterMitLock3();
    public boolean testCounterMitLock4();
    public boolean testCounterMitLock5();

    public boolean testCounterMitVersion();

    public long anzTestZeilen();
}

The persistence.xml:

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/persistence
        http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd"
    version="1.0">
    <persistence-unit name="JavaEE">
        <jta-data-source>java:DefaultDS</jta-data-source>
    </persistence-unit>
</persistence>

The test client:

package client.kap15;

import java.util.Properties;
import javax.naming.*;
import javax.rmi.PortableRemoteObject;
import server.kap15.CounterTestRemote;

public class CounterTestMitLock extends Thread {
    CounterTestRemote ctr;

    public static void main(String[] args) {
        try
        {
            testMitLock();
            testMitLock2();
            testMitLock3();
            testMitLock4();
            testMitLock5();
        }
        catch (Exception e)
        {
            e.printStackTrace ();
        }
    }

    static int N = 3;
    static CounterThread[] ct = new CounterThread[N];

    private static void testMitLock () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock();

        runTest ();
    }

    private static void testMitLock2 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock2 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock2();

        runTest ();
    }

    private static void testMitLock3 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock3 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock3();

        runTest ();
    }

    private static void testMitLock4 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock4 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock4();

        runTest ();
    }

    private static void testMitLock5 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock5 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock5();

        runTest ();
    }

    private static void runTest () throws InterruptedException
    {
        System.out.println("Datenbank initialisieren...");
        ct[0].ctr.initDatenbank();

        System.out.println("Test durchführen...");
        for (int i=0; i<N; i++)
            ct[i].start();

        System.out.println("Auf Ende warten...");
        for (int i=0; i<N; i++)
            ct[i].join();

        System.out.println("Anzahl eingetragene Zeilen: " + ct[0].ctr.anzTestZeilen());
    }

    private static CounterTestRemote verbinden() {
        try {
            Properties p = new Properties();
            p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
            p.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
            p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
            Context ctx = new InitialContext(p);

            Object ref = ctx.lookup("CounterTestBean/remote");
            CounterTestRemote ctr = (CounterTestRemote) PortableRemoteObject.narrow(ref, CounterTestRemote.class);

            return ctr;
        } catch (NamingException e) {
            System.out.println("ERROR - NamingException!");
            System.exit(-1);
        }
        return null;
    }

    public abstract static class CounterThread extends Thread
    {
        protected CounterTestRemote ctr;

        public CounterThread ()
        {
            this.ctr = verbinden ();
        }

        public void run() {
            for (int i = 0; i < 500; i++)
                test ();
        }

        public abstract void test ();
    }

    public static class CounterThreadMitLock extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock();
        }

    }

    public static class CounterThreadMitLock2 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock2();
        }

    }

    public static class CounterThreadMitLock3 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock3();
        }

    }

    public static class CounterThreadMitLock4 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock4();
        }

    }

    public static class CounterThreadMitLock5 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock5();
        }

    }
}
A: 

You don't show what you do with the return value from testCounterWithLock. My guess is that you are getting Optimistic locking failures and the return value is false sometimes.

Optimstic locking is a reasonable model when clashes are likely to be infrequent in practice, and the caller can reasonbly redo the work. So if you are getting optimistic failures you could just retry.

Alternatively, use a pessimistic locking model, this locks the row in the database at the point you read. You can do that my adding a LockMode of Pessimistic to your call to find(). Use of pessimistic locking needs to be done with care, all too easy to get bad concurrency and/or deadlocks.

djna
Can you give an example how to "adding a LockMode of Pessimistic to your call to find()"?
Aaron Digulla
I couldn't find a version of `find()` which supports a `LockMode`. This is JPA 2.0, right?
Aaron Digulla
I tried with JPA 2.0 but it doesn't work. Can you have a look at my edits of the question, please?
Aaron Digulla
+1  A: 
Pascal Thivent
It is a book example but the book doesn't contain a solution. It just gives three examples how you shouldn't do it. So my question is: Can you post a correct version that will always work? The performance should be as good as possible of course.
Aaron Digulla
Re your update: I really doubt that this will work. First of all, a read lock doesn't prevent other threads to update the row. Secondly, another thread can update the row between the `find()` and the `getCounter()`.
Aaron Digulla
I think you must use LockModeType.WRITE but maybe you can use `em.refresh()` after the `lock()` to make sure the entity isn't stale?
Aaron Digulla
@Aaron You're right about the `LockModeType.READ`, I went too fast.
Pascal Thivent
I think the only way to have it correct is to acquire an exclusive write lock when you read the entity, which JPA 1.0 does not permits. The problem can mabye be mitigated with `lock` followed by `flush` or `refresh` to the force the acquisition of a lock as quickly as possible, but it still doesn't prevent the issue altogether.
ewernli
See my edits of my quesion: It doesn't work when I lock in `find()`...
Aaron Digulla
The source of the problem is that the default database doesn't support locking. See my answer :-)
Aaron Digulla
@Aaron That's why I wanted to see the generated SQL (to execute it against the DB) :) Good to know, thanks for the feedback.
Pascal Thivent
+1  A: 

Even with LockModeType.READ or LockModeType.WRITE, JPA 1.0 does support only optimistic locking. The lock acquisitions can still be deferred to commit time, hence the problem you experienced.

From JPA 2.0 concurrency and locking:

PA 1.0 only supported Optimistic read or Optimistic write locking. JPA 2.0 supports Optimistic and Pessimistic locking

Other resources: EJB3 performance and Pessimist Locking with JPA

To have real pessimistic locking with JPA 1.0, you will need to rely on the database or implementation specific extension. E.g.:

JPA 2.0 (Something similar is possible with Hibernate API)

Account acc = em.find( Account.class, id, PESSIMISTIC );

JPA 1.0

Query query = em.createNativeQuery("SELECT * ... FOR UPDATE"); // works with most db
Account acc = (Account) query.getSingleResult();

At least, that's what I've finally use, because lock didn't worked as expected.

( Note: You could also implement a retry logic when an optimistic exception happens. But it's complicated because the transactions are managed by the app. server. You would need to use @TRANSACTION_NEW to pause the current transaction and start a new one, etc. ... too complicated I think! )

ewernli
Re stateful: In the example, they counted the failures in the catch clause. counter field -> stateful.
Aaron Digulla
Doesn't work; see the edits of my question.
Aaron Digulla
Nitpick: PESSIMISTIC is not a valid `LockModeType` http://java.sun.com/javaee/6/docs/api/javax/persistence/LockModeType.html
Pascal Thivent
@Aaron Strange. The only one you haven't tested is PESSIMISTIC_FORCE_INCREMENT. Who knows, maybe that's the right one :)
ewernli
That's interesting. It doesn't work but with PESSIMISTIC_FORCE_INCREMENT, all but the first transaction fail in `manager.flush()`!
Aaron Digulla
@Aaron Really strange. To me `testCounterMitLock4` sounds like the correct one. A few more idea to troubleshoot: (1) try with native query `SELECT * FOR UPDATE` to make sure there isn't something wrong somewhere else (2) enable show SQL and see if there is something of interest (3) if you use MySQL make sure you use InnoDB tables (4) if you use Hibernate implementation, give a try to `query.setLockMode ( ... , LockMode.UPGRADE);`
ewernli
Correction: That might have been a glitch. But now I get 18/485 as output (i.e. 503 transactions instead of 500).
Aaron Digulla
Concurrent access to SFSB are normally not allowed. How come that you don't get a `ConcurrentAccessException` http://java.sun.com/javaee/6/docs/api/javax/ejb/ConcurrentAccessException.html ? Is the count correct in database?
ewernli
The source of the problem is that the default database doesn't support locking. See my answer :-)
Aaron Digulla
+1  A: 

Since none of the locking modes worked, I tried ewernli's solution with a manual SELECT ... FOR UPDATE. That gave an interesting exception: "Unexpected token FOR". So I looked at the database.

JBoss is installed with Hypersonic 1.8 (HSQLDB) as the default which doesn't support row locking. Dear JBoss developers: A JPA implementation is supposed to throw an exception when a locking mode isn't supported.

So I added an Oracle datasource and changed my persistence.xml. After that, two tests work:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
        int wert = ct.getWert();

and

    Query query = manager.createNativeQuery ("select * from COUNTER_TEST where NR = 1 for update", CounterTestVersion.class);
    CounterTestVersion ct = (CounterTestVersion)query.getSingleResult ();
    int wert = ct.getWert ()+1;

which is interesting. It should work with LockModeType.PESSIMISTIC_FORCE_INCREMENT, too. In this case, I see this error in the log:

ORA-00054: resource busy and acquire with NOWAIT specified

This happens in the call manager.find(). I can't see why the two behave differently in the load-phase. Maybe a bug in JBoss or Hibernate.

Aaron Digulla
Ah ok! Glad to get a confirmation that `select for update` and `pessimistic_write` work in the general case.
ewernli
+1 by the way. I hope you reported the odd behavior with HSQLDB as a bug :)
Pascal Thivent