diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java index 2b8fcddc1..1b9490100 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java @@ -145,8 +145,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi private String mXAEnabled = null; - private DatabaseModel dbDataAccessObject = null; - private String mTableName = null; private String mDbName = null; @@ -358,7 +356,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi getTransactionManager().begin(); } - dbDataAccessObject = getDataAccessObject(meta); + DatabaseModel dbDataAccessObject = getDataAccessObject(meta); mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName); epb.setTableName(mTableName); @@ -366,66 +364,64 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi if (mDbName == null){ mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); } - if (isSelectStatement(mSelectSQL)) { - if (epb.isClustered()) { - connection.setAutoCommit(false); - // In cluster environment, adding a simple "FOR UPDATE" to the poll query is enough - // to make different OpenESB instances always process different rows. - // - // Although, in VERY RARE cases - i.e. if your DBMS supports transactions and - // locking, but DOES NOT support FOR UPDATE and you can't use any hacks to - // emulate it as the part of poll query - you can use a separate SQL statement - // to obtain lock before selecting rows from the polled table. - // However, note that in this case your BPEL process MUST NOT update the - // "MarkColumn" of the polled table, because there will be no guarantee that - // Database Binding updates the MarkColumn BEFORE BPEL thread also updates it. - // - // TODO: Add a separate "Lock statement" property - String lockStatement = meta.getJDBCSql().getGeneratedKey(); - if (lockStatement != null && !lockStatement.equals("")) { - Statement st = connection.createStatement(); - st.execute(lockStatement); - st.close(); - } - } else { - connection.setAutoCommit(true); + if (epb.isClustered()) { + connection.setAutoCommit(false); + // In cluster environment, adding a simple "FOR UPDATE" to the poll query is enough + // to make different OpenESB instances always process different rows. + // + // Although, in VERY RARE cases - i.e. if your DBMS supports transactions and + // locking, but DOES NOT support FOR UPDATE and you can't use any hacks to + // emulate it as the part of poll query - you can use a separate SQL statement + // to obtain lock before selecting rows from the polled table. + // However, note that in this case your BPEL process MUST NOT update the + // "MarkColumn" of the polled table, because there will be no guarantee that + // Database Binding updates the MarkColumn BEFORE BPEL thread also updates it. + // + // TODO: Add a separate "Lock statement" property + String lockStatement = meta.getJDBCSql().getGeneratedKey(); + if (lockStatement != null && !lockStatement.equals("")) { + Statement st = connection.createStatement(); + st.execute(lockStatement); + st.close(); + } + } else { + connection.setAutoCommit(true); + } + + List tempList = null; + rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); + + if (rs != null) { + final JDBCNormalizer normalizer = new JDBCNormalizer(); + Probe normalizationMeasurement = Probe.info(getClass(), + epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION); + + normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords); + normalizer.setRecordsProcessedList(mProcessedList); + inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName); + rowCount = normalizer.mRowCount; + + if(normalizationMeasurement != null){ + normalizationMeasurement.end(); } - List tempList = null; - rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); + tempList = epb.getProcessList(); + if (!tempList.isEmpty()) + { + // set JNDI name on NormalizedMessage for dynamic addressing + inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); + exchange.setMessage(inMsg, "in"); - if (rs != null) { - final JDBCNormalizer normalizer = new JDBCNormalizer(); - Probe normalizationMeasurement = Probe.info(getClass(), - epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION); + mInboundExchanges.put(exchangeId, new ListenerMeta( + System.currentTimeMillis(), this)); - normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords); - normalizer.setRecordsProcessedList(mProcessedList); - inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName); - rowCount = normalizer.mRowCount; - - if(normalizationMeasurement != null){ - normalizationMeasurement.end(); - } - - tempList = epb.getProcessList(); - if (!tempList.isEmpty()) - { - // set JNDI name on NormalizedMessage for dynamic addressing - inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); - exchange.setMessage(inMsg, "in"); - - mInboundExchanges.put(exchangeId, new ListenerMeta( - System.currentTimeMillis(), this)); - - mChannel.sendSync(exchange); - epb.getEndpointStatus().incrementSentRequests(); - } + mChannel.sendSync(exchange); + epb.getEndpointStatus().incrementSentRequests(); } + } - if (tempList != null && !tempList.isEmpty()) { - doPostProcessing(connection, tempList); - } + if (tempList != null && !tempList.isEmpty()) { + doPostProcessing(connection, tempList); } } } catch (final Exception ex) { @@ -502,8 +498,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi String lSelectSQL) throws MessagingException { ResultSet rs = null; try { - String jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); - mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); String where = ""; List bind = new ArrayList(); if (lSelectSQL.indexOf("$WHERE") >= 0) { @@ -535,6 +529,13 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi "Reason: " + ex.getLocalizedMessage() + " SQLState: " + ex.getSQLState() + " ErrorCode: " + ex.getErrorCode(); throw new MessagingException(msg, ex); } catch (final Exception ex) { + if (mLogger.isLoggable(Level.FINEST)) { + mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL, ex); + } else if (mLogger.isLoggable(Level.FINE)) { + mLogger.log(Level.FINE, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL); + } else if (mLogger.isLoggable(Level.INFO)) { + mLogger.info(mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL")); + } final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL + ex.getLocalizedMessage(); throw new MessagingException(msg, ex); @@ -780,25 +781,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); return ((DataSource) getDataSourceFromContext(jndiName)).getConnection(); } - /** - * @param prepStmtSQLText - * @return - */ - private boolean isSelectStatement(final String prepStmtSQLText) { - prepStmtSQLText.trim(); - - final StringTokenizer tok = new StringTokenizer(prepStmtSQLText); - - if (tok.hasMoreTokens()) { - final String firstTok = (String) tok.nextToken(); - - if (firstTok.equalsIgnoreCase("select")) { - return true; - } - } - - return false; - } protected void stopReceiving() { mLogger.log(Level.INFO, "DBBC_R00644.IMP_Inbound_stopped");