Do post-processing just after sending the record to NMR, use SELECT MIN(id) to serialize concurrent selects

master
Vitaliy Filippov 2015-12-18 01:54:12 +03:00
parent 6a6f2a2b47
commit 7d583f1659
1 changed files with 21 additions and 21 deletions

View File

@ -31,6 +31,7 @@
package org.glassfish.openesb.databasebc; package org.glassfish.openesb.databasebc;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Statement;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ParameterMetaData; import java.sql.ParameterMetaData;
import java.sql.ResultSet; import java.sql.ResultSet;
@ -362,10 +363,19 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
epb.setTableName(mTableName); epb.setTableName(mTableName);
connection = getDatabaseConnection(jndiName); connection = getDatabaseConnection(jndiName);
if (mDbName==null){ if (mDbName == null){
mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase();
} }
if (isSelectStatement(mSelectSQL)) { if (isSelectStatement(mSelectSQL)) {
connection.setAutoCommit(false);
if (epb.isClustered()) {
Statement st = connection.createStatement();
// Exclusively lock table (serialize concurrent SELECTs)
st.execute("SELECT MIN("+mPKName+") FROM "+mTableName+" FOR UPDATE");
st.close();
}
List tempList = null;
rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL);
if (rs != null) { if (rs != null) {
@ -382,20 +392,24 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
normalizationMeasurement.end(); normalizationMeasurement.end();
} }
final List tempList = epb.getProcessList(); tempList = epb.getProcessList();
if (!(tempList.isEmpty())) if (!tempList.isEmpty())
{ {
//set JNDI name on NormalizedMessage for dynamic addressing // set JNDI name on NormalizedMessage for dynamic addressing
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
exchange.setMessage(inMsg, "in"); exchange.setMessage(inMsg, "in");
mInboundExchanges.put(exchangeId, new ListenerMeta( mInboundExchanges.put(exchangeId, new ListenerMeta(
System.currentTimeMillis(), this)); System.currentTimeMillis(), this));
mChannel.send(exchange); mChannel.sendSync(exchange);
epb.getEndpointStatus().incrementSentRequests(); epb.getEndpointStatus().incrementSentRequests();
} }
} }
if (tempList != null && !tempList.isEmpty()) {
doPostProcessing(connection, tempList);
}
} }
} }
} catch (final Exception ex) { } catch (final Exception ex) {
@ -417,6 +431,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} }
try{ try{
if (connection != null) { if (connection != null) {
connection.commit();
connection.close(); connection.close();
} }
}catch(SQLException se){ }catch(SQLException se){
@ -698,9 +713,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
*/ */
//@Override //@Override
public synchronized void processReplyMessage(final MessageExchange exchange) throws Exception { public synchronized void processReplyMessage(final MessageExchange exchange) throws Exception {
String jndiName = null;
Connection connection = null;
if (!(exchange instanceof InOnly) && !(exchange instanceof InOut)) { if (!(exchange instanceof InOnly) && !(exchange instanceof InOut)) {
mLogger.log(Level.SEVERE, "DBBC_E00647.IMP_Unsupported_exchange_pattern", mLogger.log(Level.SEVERE, "DBBC_E00647.IMP_Unsupported_exchange_pattern",
exchange.getPattern().toString()); exchange.getPattern().toString());
@ -710,12 +722,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
final String messageId = exchange.getExchangeId(); final String messageId = exchange.getExchangeId();
try { try {
if (InboundMessageProcessor.mInboundExchanges.containsKey(messageId)) { if (InboundMessageProcessor.mInboundExchanges.containsKey(messageId)) {
if (exchange.getStatus() == ExchangeStatus.DONE) { if (exchange.getStatus() != ExchangeStatus.DONE) {
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
connection = getDatabaseConnection(jndiName);
doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId));
} else {
mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId }); mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId });
// Any status other than 'DONE' is considered an error // Any status other than 'DONE' is considered an error
final String msgError = "Error occured while getting DONE Response "; final String msgError = "Error occured while getting DONE Response ";
@ -735,13 +742,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} finally { } finally {
InboundMessageProcessor.mInboundExchanges.remove(messageId); InboundMessageProcessor.mInboundExchanges.remove(messageId);
mMapInboundExchangesProcessRecords.remove(messageId); mMapInboundExchangesProcessRecords.remove(messageId);
try{
if(connection != null) {
connection.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
} }
} }