Hi,
I am new to the Java / Hibernate / Seam way of development but I appear to have a strange issue with Hibernate and concurrent threads.
I have a application scoped Seam component which is executed via EJB timers at a set interval (Orchestrator.java) calling the method startProcessingWorkloads.
This method has a injected EntityManager which it uses to check the database for a collection of data, and if it finds a work collection it creates a new Asynchronous Seam component (LoadContoller.java) and executes the start() method on the Controller
LoadController has EntityManager injected and use it to perform a very large transaction (About one hour)
Once the LoadController is running as a separate thread, the Orchestrator is still being executed as a thread at a set interval, so for example
1min
Orchestrator - Looks for work collection (None found) (thread 1)
2min
Orchestrator - Looks for work collection (finds one, Starts LoadController) (thread 1)
LoadController - Starts updating database records (thread 2)
3min
Orchestrator - Looks for work collection (None found) (thread 1)
LoadController - Still updating database records (thread 2)
4min
Orchestrator - Looks for work collection (None found) (thread 1)
LoadController - Still updating database records (thread 2)
5min
Orchestrator - Looks for work collection (None found) (thread 1)
LoadController - Done updating database records (thread 2)
6min
Orchestrator - Looks for work collection (None found) (thread 1)
7min
Orchestrator - Looks for work collection (None found) (thread 1)
However, I am receiving a intermittent error (See below) when the Orchestrator runs concurrently with the LoadController.
5:10:40,852 WARN [AbstractBatcher] exception clearing maxRows/queryTimeout java.sql.SQLException: Connection is not associated with a managed connection.org.jboss.resource.adapter.jdbc.jdk6.WrappedConnectionJDK6@1fcdb21
This error is thrown after the Orchestrator has completed its SQl query and as the LoadController attempts to execute its next SQl query.
I did some research I came to the conclusion that the EntityManager was being closed hence the LoadController was unable to use it.
Now confused as to what exactly closed the connection I did some basic object dumps of the entity manager objects used by the Orchestrator and the LoadController when each of the components are called and I found that just before I receive the above error this happens.
2010-07-30 15:06:40,804 INFO [processManagement.LoadController] (pool-15-thread-2) org.jboss.seam.persistence.EntityManagerProxy@7e3da1
2010-07-30 15:10:40,758 INFO [processManagement.Orchestrator] (pool-15-thread-1) org.jboss.seam.persistence.EntityManagerProxy@7e3da1
It appears that during one of the Orchestrator execution intervals it obtains a reference to the same EntityManager that the LoadController is currently using. When the Orchestrator completes its SQL execution it closes the connection and than LoadController can no longer execute its updates.
So my question is, does any one know of this happening or having I got my threading all mucked up in this code?
From my understanding when injecting a EntityManager a new instance is injected from the EntityManagerFactory which remains with that particualr object until object leaves scope (in this case they are stateless so when the start() methods ends), how could the same instance of a entity manager be injected into two separate threads?
Orchestrator.java
@Name("processOrchestrator")
@Scope(ScopeType.APPLICATION)
@AutoCreate
public class Orchestrator {
//___________________________________________________________
@Logger Log log;
@In EntityManager entityManager;
@In LoadController loadController;
@In WorkloadManager workloadManager;
//___________________________________________________________
private int fProcessInstanceCount = 0;
//___________________________________________________________
public Orchestrator() {}
//___________________________________________________________
synchronized private void incrementProcessInstanceCount() {
fProcessInstanceCount++;
}
//___________________________________________________________
synchronized private void decreaseProcessInstanceCount() {
fProcessInstanceCount--;
}
//___________________________________________________________
@Observer("controllerExceptionEvent")
synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) {
decreaseProcessInstanceCount();
log.info(
"Controller " + String.valueOf(aProcess) +
" failed with the error [" + aException.getMessage() + "]"
);
Events.instance().raiseEvent(
Application.ApplicationEvent.applicationExceptionEvent.name(),
aException,
Orchestrator.class
);
}
//___________________________________________________________
@Observer("controllerCompleteEvent")
synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) {
try {
MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId);
workloadManager.completeWorkload(completedWorklaod);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
decreaseProcessInstanceCount();
log.info("Controller " + String.valueOf(aProcess) + " completed successfuly");
}
//___________________________________________________________
@Asynchronous
public void startProcessingWorkloads(@IntervalDuration long interval) {
log.info("Polling for workloads.");
log.info(entityManager.toString());
try {
MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload();
if (pendingWorkload != null) {
log.info(
"Pending Workload found (Workload_Id = " +
String.valueOf(pendingWorkload.getWorkloadId()) +
"), starting process controller."
);
Process aProcess = pendingWorkload.retriveProcessIdAsProcess();
ControllerIntf controller = createWorkloadController(aProcess);
if (controller != null) {
controller.start(aProcess, pendingWorkload.getWorkloadId());
workloadManager.setWorkloadProcessing(pendingWorkload);
}
}
} catch (Exception ex) {
Events.instance().raiseEvent(
Application.ApplicationEvent.applicationExceptionEvent.name(),
ex,
Orchestrator.class
);
}
log.info("Polling complete.");
}
//___________________________________________________________
private ControllerIntf createWorkloadController(Process aProcess) {
ControllerIntf newController = null;
switch(aProcess) {
case LOAD:
newController = loadController;
break;
default:
log.info(
"createWorkloadController() does not know the value (" +
aProcess.name() +
") no controller will be started."
);
}
// If a new controller is created than increase the
// count of started controllers so that we know how
// many are running.
if (newController != null) {
incrementProcessInstanceCount();
}
return newController;
}
//___________________________________________________________
}
LoadController.java
@Name("loadController")
@Scope(ScopeType.STATELESS)
@AutoCreate
public class LoadController implements ControllerIntf {
//__________________________________________________
@Logger private Log log;
@In private EntityManager entityManager;
//__________________________________________________
private String fFileName = "";
private String fNMDSFileName = "";
private String fAddtFileName = "";
//__________________________________________________
public LoadController(){ }
//__________________________________________________
@Asynchronous
synchronized public void start(Process aProcess, long aWorkloadId) {
log.info(
LoadController.class.getName() +
" process thread was started for WorkloadId [" +
String.valueOf(aWorkloadId) + "]."
);
log.info(entityManager.toString());
try {
Query aQuery = entityManager.createQuery(
"from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId)
);
MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult();
fFileName =
misLoadRecord.getInitiatedBy().toUpperCase() + "_" +
misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" +
DateUtils.now(DateUtils.FORMAT_FILE) + ".csv"
;
fNMDSFileName = "NMDS_" + fFileName;
fAddtFileName = "Addt_" + fFileName;
createDataFile(misLoadRecord.getFileContents());
ArrayList<String>sasCode = generateSASCode(
misLoadRecord.getLoadId(),
misLoadRecord.getMdSourceSystem().getPreloadFile()
);
//TODO: As the sas password will be encrypted in the database, we will
// need to decrypt it before passing to the below function
executeLoadSASCode(
sasCode,
misLoadRecord.getInitiatedBy(),
misLoadRecord.getSasPassword()
);
createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId());
//TODO: Needs to remove password from DB when complete
removeTempCSVFiles();
Events.instance().raiseEvent(
Application.ApplicationEvent.controllerCompleteEvent.name(),
aProcess,
aWorkloadId
);
log.info(LoadController.class.getName() + " process thread completed.");
} catch (Exception ex) {
Events.instance().raiseEvent(
Application.ApplicationEvent.controllerExceptionEvent.name(),
aProcess,
ex
);
}
}
//__________________________________________________
private void createDataFile(byte[] aFileContent) throws Exception {
File dataFile =
new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName);
FileUtils.writeBytesToFile(dataFile, aFileContent, true);
}
//__________________________________________________
private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) {
String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
ArrayList<String> sasCode = new ArrayList<String>();
sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";");
sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";");
sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";");
sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";");
sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");
sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;");
sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");");
sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");");
sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";");
sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";");
sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";");
sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";");
sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";");
sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";");
sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";");
sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);");
sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);");
return sasCode;
}
//__________________________________________________
private void executeLoadSASCode(
ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception
{
SASExecutor aSASExecutor = new SASExecutor(
ECEConfig.getConfiguration().sas_server(),
ECEConfig.getConfiguration().sas_port(),
aUserName,
aPassword
);
aSASExecutor.execute(aSasCode);
log.info(aSASExecutor.getCompleteSasLog());
}
//__________________________________________________
/**
* Creates the MIS_UR_Workload_Contents records for
* the ECE Unit Record data that was just loaded
*
* @param aWorkloadId
* @param aMisLoadId
* @throws Exception
*/
private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception {
String selectionRule =
" from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " +
String.valueOf(aMisLoadId)
;
MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId);
SeamManualTransaction manualTx = new SeamManualTransaction(
entityManager,
ECEConfig.getConfiguration().manualSeamTxTimeLimit()
);
manualTx.begin();
RecordPager oPager = new RecordPager(
entityManager,
selectionRule,
ECEConfig.getConfiguration().recordPagerDefaultPageSize()
);
Object nextRecord = null;
while ((nextRecord = oPager.getNextRecord()) != null) {
EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord;
MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents();
aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId());
aContentsRecord.setMisWorkload(misWorkload);
aContentsRecord.setProcessOutcome('C');
entityManager.persist(aContentsRecord);
}
manualTx.commit();
}
/**
* Removes the CSV temp files that are created for input
* into the SAS server and that are created as output.
*/
private void removeTempCSVFiles() {
String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
File dataInputCSV = new File(sasTempDir + "\\" + fFileName);
File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName);
File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName);
if (dataInputCSV.exists()) {
dataInputCSV.delete();
}
if (nmdsOutputCSV.exists()) {
nmdsOutputCSV.delete();
}
if (addtOutputCSV.exists()) {
addtOutputCSV.delete();
}
}
}
SeamManualTransaction.java
public class SeamManualTransaction {
//___________________________________________________________
private boolean fObjectUsed = false;
private boolean fJoinExistingTransaction = true;
private int fTransactionTimeout = 60; // Default: 60 seconds
private UserTransaction fUserTx;
private EntityManager fEntityManager;
//___________________________________________________________
/**
* Set the transaction timeout in milliseconds (from minutes)
*
* @param aTimeoutInMins The number of minutes to keep the transaction active
*/
private void setTransactionTimeout(int aTimeoutInSecs) {
// 60 * aTimeoutInSecs = Timeout in Seconds
fTransactionTimeout = 60 * aTimeoutInSecs;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
*/
public SeamManualTransaction(EntityManager aEntityManager) {
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
* @param aTimeoutInSecs
*/
public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) {
setTransactionTimeout(aTimeoutInSecs);
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
* @param aTimeoutInSecs
* @param aJoinExistingTransaction
*/
public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) {
setTransactionTimeout(aTimeoutInSecs);
fJoinExistingTransaction = aJoinExistingTransaction;
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Starts the new transaction
*
* @throws Exception
*/
public void begin() throws Exception {
if (fObjectUsed) {
throw new Exception(
SeamManualTransaction.class.getCanonicalName() +
" has been used. Create new instance."
);
}
fUserTx =
(UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction");
fUserTx.setTransactionTimeout(fTransactionTimeout);
fUserTx.begin();
/* If entity manager is created before the transaction
* is started (ie. via Injection) then it must join the
* transaction
*/
if (fJoinExistingTransaction) {
fEntityManager.joinTransaction();
}
}
//___________________________________________________________
/**
* Commit the transaction to the database
*
* @throws Exception
*/
public void commit() throws Exception {
fObjectUsed = true;
fUserTx.commit();
}
//_________________________________
/** * Rolls the transaction back * * @throws Exception */
public void rollback() throws Exception { fObjectUsed = true; fUserTx.rollback(); }
//_________________________________ }