Allow to use any query for POLL operation

master
Vitaliy Filippov 2015-12-21 00:07:03 +03:00
parent 4a592c717b
commit 133fc66358
1 changed files with 60 additions and 78 deletions

View File

@ -145,8 +145,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
private String mXAEnabled = null; private String mXAEnabled = null;
private DatabaseModel dbDataAccessObject = null;
private String mTableName = null; private String mTableName = null;
private String mDbName = null; private String mDbName = null;
@ -358,7 +356,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
getTransactionManager().begin(); getTransactionManager().begin();
} }
dbDataAccessObject = getDataAccessObject(meta); DatabaseModel dbDataAccessObject = getDataAccessObject(meta);
mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName); mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName);
epb.setTableName(mTableName); epb.setTableName(mTableName);
@ -366,66 +364,64 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
if (mDbName == null){ if (mDbName == null){
mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase();
} }
if (isSelectStatement(mSelectSQL)) { if (epb.isClustered()) {
if (epb.isClustered()) { connection.setAutoCommit(false);
connection.setAutoCommit(false); // In cluster environment, adding a simple "FOR UPDATE" to the poll query is enough
// In cluster environment, adding a simple "FOR UPDATE" to the poll query is enough // to make different OpenESB instances always process different rows.
// to make different OpenESB instances always process different rows. //
// // Although, in VERY RARE cases - i.e. if your DBMS supports transactions and
// Although, in VERY RARE cases - i.e. if your DBMS supports transactions and // locking, but DOES NOT support FOR UPDATE and you can't use any hacks to
// locking, but DOES NOT support FOR UPDATE and you can't use any hacks to // emulate it as the part of poll query - you can use a separate SQL statement
// emulate it as the part of poll query - you can use a separate SQL statement // to obtain lock before selecting rows from the polled table.
// to obtain lock before selecting rows from the polled table. // However, note that in this case your BPEL process MUST NOT update the
// However, note that in this case your BPEL process MUST NOT update the // "MarkColumn" of the polled table, because there will be no guarantee that
// "MarkColumn" of the polled table, because there will be no guarantee that // Database Binding updates the MarkColumn BEFORE BPEL thread also updates it.
// Database Binding updates the MarkColumn BEFORE BPEL thread also updates it. //
// // TODO: Add a separate "Lock statement" property
// TODO: Add a separate "Lock statement" property String lockStatement = meta.getJDBCSql().getGeneratedKey();
String lockStatement = meta.getJDBCSql().getGeneratedKey(); if (lockStatement != null && !lockStatement.equals("")) {
if (lockStatement != null && !lockStatement.equals("")) { Statement st = connection.createStatement();
Statement st = connection.createStatement(); st.execute(lockStatement);
st.execute(lockStatement); st.close();
st.close(); }
} } else {
} else { connection.setAutoCommit(true);
connection.setAutoCommit(true); }
List tempList = null;
rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL);
if (rs != null) {
final JDBCNormalizer normalizer = new JDBCNormalizer();
Probe normalizationMeasurement = Probe.info(getClass(),
epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords);
normalizer.setRecordsProcessedList(mProcessedList);
inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName);
rowCount = normalizer.mRowCount;
if(normalizationMeasurement != null){
normalizationMeasurement.end();
} }
List tempList = null; tempList = epb.getProcessList();
rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); if (!tempList.isEmpty())
{
// set JNDI name on NormalizedMessage for dynamic addressing
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
exchange.setMessage(inMsg, "in");
if (rs != null) { mInboundExchanges.put(exchangeId, new ListenerMeta(
final JDBCNormalizer normalizer = new JDBCNormalizer(); System.currentTimeMillis(), this));
Probe normalizationMeasurement = Probe.info(getClass(),
epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords); mChannel.sendSync(exchange);
normalizer.setRecordsProcessedList(mProcessedList); epb.getEndpointStatus().incrementSentRequests();
inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName);
rowCount = normalizer.mRowCount;
if(normalizationMeasurement != null){
normalizationMeasurement.end();
}
tempList = epb.getProcessList();
if (!tempList.isEmpty())
{
// set JNDI name on NormalizedMessage for dynamic addressing
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
exchange.setMessage(inMsg, "in");
mInboundExchanges.put(exchangeId, new ListenerMeta(
System.currentTimeMillis(), this));
mChannel.sendSync(exchange);
epb.getEndpointStatus().incrementSentRequests();
}
} }
}
if (tempList != null && !tempList.isEmpty()) { if (tempList != null && !tempList.isEmpty()) {
doPostProcessing(connection, tempList); doPostProcessing(connection, tempList);
}
} }
} }
} catch (final Exception ex) { } catch (final Exception ex) {
@ -502,8 +498,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
String lSelectSQL) throws MessagingException { String lSelectSQL) throws MessagingException {
ResultSet rs = null; ResultSet rs = null;
try { try {
String jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
String where = ""; String where = "";
List<String> bind = new ArrayList<String>(); List<String> bind = new ArrayList<String>();
if (lSelectSQL.indexOf("$WHERE") >= 0) { if (lSelectSQL.indexOf("$WHERE") >= 0) {
@ -535,6 +529,13 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
"Reason: " + ex.getLocalizedMessage() + " SQLState: " + ex.getSQLState() + " ErrorCode: " + ex.getErrorCode(); "Reason: " + ex.getLocalizedMessage() + " SQLState: " + ex.getSQLState() + " ErrorCode: " + ex.getErrorCode();
throw new MessagingException(msg, ex); throw new MessagingException(msg, ex);
} catch (final Exception ex) { } catch (final Exception ex) {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL, ex);
} else if (mLogger.isLoggable(Level.FINE)) {
mLogger.log(Level.FINE, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL);
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL"));
}
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL
+ ex.getLocalizedMessage(); + ex.getLocalizedMessage();
throw new MessagingException(msg, ex); throw new MessagingException(msg, ex);
@ -780,25 +781,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
return ((DataSource) getDataSourceFromContext(jndiName)).getConnection(); return ((DataSource) getDataSourceFromContext(jndiName)).getConnection();
} }
/**
* @param prepStmtSQLText
* @return
*/
private boolean isSelectStatement(final String prepStmtSQLText) {
prepStmtSQLText.trim();
final StringTokenizer tok = new StringTokenizer(prepStmtSQLText);
if (tok.hasMoreTokens()) {
final String firstTok = (String) tok.nextToken();
if (firstTok.equalsIgnoreCase("select")) {
return true;
}
}
return false;
}
protected void stopReceiving() { protected void stopReceiving() {
mLogger.log(Level.INFO, "DBBC_R00644.IMP_Inbound_stopped"); mLogger.log(Level.INFO, "DBBC_R00644.IMP_Inbound_stopped");