diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java index b79c9fbd4..01c724a64 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java @@ -31,6 +31,7 @@ package org.glassfish.openesb.databasebc; import java.sql.Connection; +import java.sql.Statement; import java.sql.PreparedStatement; import java.sql.ParameterMetaData; import java.sql.ResultSet; @@ -362,10 +363,19 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi epb.setTableName(mTableName); connection = getDatabaseConnection(jndiName); - if (mDbName==null){ + if (mDbName == null){ mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); } if (isSelectStatement(mSelectSQL)) { + connection.setAutoCommit(false); + if (epb.isClustered()) { + Statement st = connection.createStatement(); + // Exclusively lock table (serialize concurrent SELECTs) + st.execute("SELECT MIN("+mPKName+") FROM "+mTableName+" FOR UPDATE"); + st.close(); + } + + List tempList = null; rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); if (rs != null) { @@ -382,20 +392,24 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi normalizationMeasurement.end(); } - final List tempList = epb.getProcessList(); - if (!(tempList.isEmpty())) + tempList = epb.getProcessList(); + if (!tempList.isEmpty()) { - //set JNDI name on NormalizedMessage for dynamic addressing + // set JNDI name on NormalizedMessage for dynamic addressing inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); exchange.setMessage(inMsg, "in"); mInboundExchanges.put(exchangeId, new ListenerMeta( System.currentTimeMillis(), this)); - mChannel.send(exchange); + mChannel.sendSync(exchange); epb.getEndpointStatus().incrementSentRequests(); } } + + if (tempList != null && !tempList.isEmpty()) { + doPostProcessing(connection, tempList); + } } } } catch (final Exception ex) { @@ -417,6 +431,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } try{ if (connection != null) { + connection.commit(); connection.close(); } }catch(SQLException se){ @@ -698,9 +713,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi */ //@Override public synchronized void processReplyMessage(final MessageExchange exchange) throws Exception { - String jndiName = null; - Connection connection = null; - if (!(exchange instanceof InOnly) && !(exchange instanceof InOut)) { mLogger.log(Level.SEVERE, "DBBC_E00647.IMP_Unsupported_exchange_pattern", exchange.getPattern().toString()); @@ -710,12 +722,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi final String messageId = exchange.getExchangeId(); try { if (InboundMessageProcessor.mInboundExchanges.containsKey(messageId)) { - if (exchange.getStatus() == ExchangeStatus.DONE) { - jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); - mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); - connection = getDatabaseConnection(jndiName); - doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId)); - } else { + if (exchange.getStatus() != ExchangeStatus.DONE) { mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId }); // Any status other than 'DONE' is considered an error final String msgError = "Error occured while getting DONE Response "; @@ -735,13 +742,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } finally { InboundMessageProcessor.mInboundExchanges.remove(messageId); mMapInboundExchangesProcessRecords.remove(messageId); - try{ - if(connection != null) { - connection.close(); - } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); - } } }