views:

99

answers:

2

Hi,

I am new to the Java / Hibernate / Seam way of development but I appear to have a strange issue with Hibernate and concurrent threads.

I have a application scoped Seam component which is executed via EJB timers at a set interval (Orchestrator.java) calling the method startProcessingWorkloads.

This method has a injected EntityManager which it uses to check the database for a collection of data, and if it finds a work collection it creates a new Asynchronous Seam component (LoadContoller.java) and executes the start() method on the Controller

LoadController has EntityManager injected and use it to perform a very large transaction (About one hour)

Once the LoadController is running as a separate thread, the Orchestrator is still being executed as a thread at a set interval, so for example

1min
Orchestrator - Looks for work collection (None found) (thread 1)


2min
Orchestrator - Looks for work collection (finds one, Starts LoadController) (thread 1)
LoadController - Starts updating database records (thread 2)


3min
Orchestrator - Looks for work collection (None found) (thread 1)
LoadController - Still updating database records (thread 2)

4min
Orchestrator - Looks for work collection (None found) (thread 1)
LoadController - Still updating database records (thread 2)


5min
Orchestrator - Looks for work collection (None found) (thread 1)
LoadController - Done updating database records (thread 2)


6min
Orchestrator - Looks for work collection (None found) (thread 1)
7min
Orchestrator - Looks for work collection (None found) (thread 1)

However, I am receiving a intermittent error (See below) when the Orchestrator runs concurrently with the LoadController.

5:10:40,852 WARN [AbstractBatcher] exception clearing maxRows/queryTimeout java.sql.SQLException: Connection is not associated with a managed connection.org.jboss.resource.adapter.jdbc.jdk6.WrappedConnectionJDK6@1fcdb21

This error is thrown after the Orchestrator has completed its SQl query and as the LoadController attempts to execute its next SQl query.

I did some research I came to the conclusion that the EntityManager was being closed hence the LoadController was unable to use it.

Now confused as to what exactly closed the connection I did some basic object dumps of the entity manager objects used by the Orchestrator and the LoadController when each of the components are called and I found that just before I receive the above error this happens.

2010-07-30 15:06:40,804 INFO [processManagement.LoadController] (pool-15-thread-2) org.jboss.seam.persistence.EntityManagerProxy@7e3da1

2010-07-30 15:10:40,758 INFO [processManagement.Orchestrator] (pool-15-thread-1) org.jboss.seam.persistence.EntityManagerProxy@7e3da1

It appears that during one of the Orchestrator execution intervals it obtains a reference to the same EntityManager that the LoadController is currently using. When the Orchestrator completes its SQL execution it closes the connection and than LoadController can no longer execute its updates.

So my question is, does any one know of this happening or having I got my threading all mucked up in this code?

From my understanding when injecting a EntityManager a new instance is injected from the EntityManagerFactory which remains with that particualr object until object leaves scope (in this case they are stateless so when the start() methods ends), how could the same instance of a entity manager be injected into two separate threads?

Orchestrator.java

@Name("processOrchestrator")
@Scope(ScopeType.APPLICATION)
@AutoCreate 
public class Orchestrator {

  //___________________________________________________________

  @Logger Log log;

  @In EntityManager entityManager;

  @In LoadController loadController;

  @In WorkloadManager workloadManager;

  //___________________________________________________________

  private int fProcessInstanceCount = 0;

  //___________________________________________________________

  public Orchestrator() {}

  //___________________________________________________________

  synchronized private void incrementProcessInstanceCount() {
    fProcessInstanceCount++;
  }

  //___________________________________________________________

  synchronized private void decreaseProcessInstanceCount() {
    fProcessInstanceCount--;
  }

  //___________________________________________________________

  @Observer("controllerExceptionEvent") 
  synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) {
    decreaseProcessInstanceCount();

    log.info(
      "Controller " + String.valueOf(aProcess) + 
      " failed with the error [" + aException.getMessage() + "]"
    );

    Events.instance().raiseEvent(
      Application.ApplicationEvent.applicationExceptionEvent.name(), 
      aException,
      Orchestrator.class
    );
  }

  //___________________________________________________________

  @Observer("controllerCompleteEvent") 
  synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) {
    try {
      MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId);
      workloadManager.completeWorkload(completedWorklaod);
    } catch (Exception ex) {
      log.error(ex.getMessage(), ex);
    }

    decreaseProcessInstanceCount();

    log.info("Controller " + String.valueOf(aProcess) + " completed successfuly");
  }

  //___________________________________________________________

  @Asynchronous
  public void startProcessingWorkloads(@IntervalDuration long interval) {
    log.info("Polling for workloads.");
    log.info(entityManager.toString());
    try {
      MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload();

      if (pendingWorkload != null) {
        log.info(
          "Pending Workload found (Workload_Id = " + 
          String.valueOf(pendingWorkload.getWorkloadId()) + 
          "), starting process controller."
        );

        Process aProcess = pendingWorkload.retriveProcessIdAsProcess();

        ControllerIntf controller = createWorkloadController(aProcess);          

        if (controller != null) {
          controller.start(aProcess, pendingWorkload.getWorkloadId());
          workloadManager.setWorkloadProcessing(pendingWorkload);
        }
      }

    } catch (Exception ex) {
      Events.instance().raiseEvent(
        Application.ApplicationEvent.applicationExceptionEvent.name(), 
        ex,
        Orchestrator.class
      );
    }

    log.info("Polling complete.");
  }

  //___________________________________________________________  

  private ControllerIntf createWorkloadController(Process aProcess) {
    ControllerIntf newController = null;

    switch(aProcess) {
      case LOAD:
        newController = loadController;
        break;

      default:
        log.info(
          "createWorkloadController() does not know the value (" +
          aProcess.name() + 
          ") no controller will be started."
        );
    }

    // If a new controller is created than increase the 
    // count of started controllers so that we know how
    // many are running.
    if (newController != null) {
      incrementProcessInstanceCount();
    }

    return newController;
  }

  //___________________________________________________________

}

LoadController.java

@Name("loadController")
@Scope(ScopeType.STATELESS)
@AutoCreate
public class LoadController implements ControllerIntf {
  //__________________________________________________

  @Logger private Log log;

  @In private EntityManager entityManager; 

  //__________________________________________________

  private String fFileName = "";
  private String fNMDSFileName = "";
  private String fAddtFileName = "";

  //__________________________________________________

  public LoadController(){  }
  //__________________________________________________

  @Asynchronous 
  synchronized public void start(Process aProcess, long aWorkloadId) {
    log.info(
      LoadController.class.getName() + 
      " process thread was started for WorkloadId [" + 
      String.valueOf(aWorkloadId) + "]."
    );
    log.info(entityManager.toString());
    try {
      Query aQuery = entityManager.createQuery(
        "from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId)
      );

      MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult();

      fFileName = 
        misLoadRecord.getInitiatedBy().toUpperCase() + "_" +
        misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" +
        DateUtils.now(DateUtils.FORMAT_FILE) + ".csv"
      ;

      fNMDSFileName = "NMDS_" + fFileName;
      fAddtFileName = "Addt_" + fFileName;

      createDataFile(misLoadRecord.getFileContents());

      ArrayList<String>sasCode = generateSASCode(
        misLoadRecord.getLoadId(),
        misLoadRecord.getMdSourceSystem().getPreloadFile()
      );

      //TODO: As the sas password will be encrypted in the database, we will
      //      need to decrypt it before passing to the below function
      executeLoadSASCode(
        sasCode, 
        misLoadRecord.getInitiatedBy(), 
        misLoadRecord.getSasPassword()
      );

      createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId());

      //TODO: Needs to remove password from DB when complete
      removeTempCSVFiles();

      Events.instance().raiseEvent(
        Application.ApplicationEvent.controllerCompleteEvent.name(), 
        aProcess, 
        aWorkloadId
      );

      log.info(LoadController.class.getName() + " process thread completed.");
    } catch (Exception ex) {
      Events.instance().raiseEvent(
        Application.ApplicationEvent.controllerExceptionEvent.name(),
        aProcess, 
        ex
      );
    }
  }
  //__________________________________________________

  private void createDataFile(byte[] aFileContent) throws Exception {
    File dataFile = 
      new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName);

    FileUtils.writeBytesToFile(dataFile, aFileContent, true);
  }

  //__________________________________________________

  private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) {
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
    ArrayList<String> sasCode = new ArrayList<String>();

    sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";");
    sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";");
    sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";");
    sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";");
    sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");    
    sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;");

    sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");");
    sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");");
    sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";");
    sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";");
    sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";");
    sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";");

    sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";");
    sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";");
    sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";");
    sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);");
    sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);");

    return sasCode;
  }

  //__________________________________________________

  private void executeLoadSASCode(
    ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception 
  {
    SASExecutor aSASExecutor = new SASExecutor(
      ECEConfig.getConfiguration().sas_server(),
      ECEConfig.getConfiguration().sas_port(),
      aUserName, 
      aPassword
    );

    aSASExecutor.execute(aSasCode);

    log.info(aSASExecutor.getCompleteSasLog());
  }
  //__________________________________________________

  /**
   * Creates the MIS_UR_Workload_Contents records for 
   * the ECE Unit Record data that was just loaded
   * 
   * @param aWorkloadId
   * @param aMisLoadId
   * @throws Exception
   */

  private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception {

    String selectionRule = 
      " from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " + 
      String.valueOf(aMisLoadId)
    ;
    MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId);
    SeamManualTransaction manualTx = new SeamManualTransaction(
      entityManager, 
      ECEConfig.getConfiguration().manualSeamTxTimeLimit()
    );
    manualTx.begin();
    RecordPager oPager = new RecordPager(
      entityManager, 
      selectionRule, 
      ECEConfig.getConfiguration().recordPagerDefaultPageSize()
    );

    Object nextRecord = null;

    while ((nextRecord = oPager.getNextRecord()) != null) {
      EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord;

      MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents();

      aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId());
      aContentsRecord.setMisWorkload(misWorkload);
      aContentsRecord.setProcessOutcome('C');

      entityManager.persist(aContentsRecord);
    }

    manualTx.commit();
  }

  /**
   * Removes the CSV temp files that are created for input 
   * into the SAS server and that are created as output.  
   */

  private void removeTempCSVFiles() {
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
    File dataInputCSV = new File(sasTempDir + "\\" + fFileName);
    File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName);
    File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName);

    if (dataInputCSV.exists()) {
      dataInputCSV.delete();
    }
    if (nmdsOutputCSV.exists()) {
      nmdsOutputCSV.delete();
    }

    if (addtOutputCSV.exists()) {
      addtOutputCSV.delete();
    }
  }
}

SeamManualTransaction.java

public class SeamManualTransaction {

  //___________________________________________________________

  private boolean fObjectUsed = false;
  private boolean fJoinExistingTransaction = true;
  private int fTransactionTimeout = 60; // Default: 60 seconds
  private UserTransaction fUserTx;
  private EntityManager fEntityManager;

  //___________________________________________________________

  /**
   * Set the transaction timeout in milliseconds (from minutes)
   * 
   * @param aTimeoutInMins The number of minutes to keep the transaction active
   */

  private void setTransactionTimeout(int aTimeoutInSecs) {
    // 60 * aTimeoutInSecs = Timeout in Seconds
    fTransactionTimeout = 60 * aTimeoutInSecs;
  }

  //___________________________________________________________

  /**
   * Constructor 
   * 
   * @param aEntityManager
   */

  public SeamManualTransaction(EntityManager aEntityManager) {
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Constructor
   * 
   * @param aEntityManager
   * @param aTimeoutInSecs
   */

  public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) {
    setTransactionTimeout(aTimeoutInSecs);
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Constructor
   * 
   * @param aEntityManager
   * @param aTimeoutInSecs
   * @param aJoinExistingTransaction
   */
  public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) {
    setTransactionTimeout(aTimeoutInSecs);
    fJoinExistingTransaction = aJoinExistingTransaction;
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Starts the new transaction
   * 
   * @throws Exception
   */

  public void begin() throws Exception {
    if (fObjectUsed) {
      throw new Exception(
        SeamManualTransaction.class.getCanonicalName() + 
        " has been used. Create new instance."
      );
    }

    fUserTx = 
      (UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction"); 

    fUserTx.setTransactionTimeout(fTransactionTimeout);
    fUserTx.begin(); 

    /* If entity manager is created before the transaction 
     * is started (ie. via Injection) then it must join the 
     * transaction 
     */ 
    if (fJoinExistingTransaction) {
      fEntityManager.joinTransaction();
    }
  }

  //___________________________________________________________

  /**
   * Commit the transaction to the database
   * 
   * @throws Exception
   */

  public void commit() throws Exception {
    fObjectUsed = true;
    fUserTx.commit();
  }

//_________________________________

/** * Rolls the transaction back * * @throws Exception */

public void rollback() throws Exception { fObjectUsed = true; fUserTx.rollback(); }

//_________________________________ }

A: 

Well, my first is advice is

If you are using an EJB application, prefer To use a Bean Managed Transaction instead of your custom SeamManualTransaction. When you use a Bean Managed Transaction, you, as a developer, Take care of calling begin and commit. You get this feature by using an UserTransaction component. You can create a Facade layer which begins and commit your Transaction. Something like

/**
  * default scope when using @Stateless session bean is ScopeType.STATELESS
  *
  * So you do not need to declare @Scope(ScopeType.STATELESS) anymore
  *
  * A session bean can not use both BEAN and CONTAINER Transaction management at The same Time
  */
@Stateless
@Name("businessFacade")
@TransactionManagement(TransactionManagerType.BEAN)
public class BusinessFacade implements BusinessFacadeLocal {

    private @Resource TimerService timerService;
    private @Resource UserTransaction userTransaction;
    /**
      * You can use @In of you are using Seam capabilities
      */
    private @PersistenceContext entityManager;

    public void doSomething() {
        try {
            userTransaction.begin();
            userTransaction.setTransactionTimeout(int seconds);

            // business logic goes here

            /**
              * To enable your Timer service, just call
              *
              * timerService.createTimer(15*60*1000, 15*60*1000, <ANY_SERIALIZABLE_INFO_GOES_HERE>);
              */

            userTransaction.commit();
        } catch (Exception e) {
            userTransaction.rollback();
        }
    }

    @Timeout
    public void doTimer(Timer timer) {
        try {
            userTransaction.begin();         

            timer.getInfo();

            // logic goes here

            userTransaction.commit();
        } catch (Exception e) {
            userTransaction.rollback();
        }
    }

}

Let's see UserTransaction.begin method API

Create a new transaction and associate it with the current thread

There is more:

The lifetime of a container-managed persistence context (injected Through @PersistenceContext annotation) corresponds to the scope of a transaction (between begin and commit method call) when using transaction-scoped persistence context

Now Let's see TimerService

It is a container-provided service that allows enterprise beans to be registered for timer callback methods to occur at a specified time, after a specified elapsed time, or after specified intervals. The bean class of an enterprise bean that uses the timer service must provide a timeout callback method. Timers can be created for stateless session beans, message-driven beans

I hope It can be useful To you

Arthur Ronald F D Garcia
Thanks Arthur. Your post forced me to do one thing, reread my code. In the LoadController.createWorkloadContentRecords() method I found a SeamManualTransaction that is not required due to refactoring. I removed this and my issue now appears to be gone (fingers crossed). However, it does seem odd that the same instance can be injected into two threads while one is till using it.
Scott
A: 

In general, injecting an entityManager in a Seam component of scope APPLICATION is not right. An entity manager is something you create, use and close again, in a scope typically much shorter than APPLICATION scope.

Improve by choosing smaller scopes with a standard entityManager injection, or if you need the APPLICATION scope, inject an EntityManagerFactory instead, and create, use and close the entityManager yourself.

Look in your Seam components.xml to find the name of your EntityManagerFactory compoment.

Paul
Thanks Paul, I managed to find the root cause of this problem, and yes it was partly due to the use of a EntityManager in the Application scope. I have refactored the code now so that the Orchestrator class does not use a EntityManager at all, but rather the EntityManagers are used within Stateless Session Beans instead.
Scott