Implement parallel active-active cluster poll processing support

master
Vitaliy Filippov 2015-12-15 02:07:53 +03:00
parent a7841074c5
commit e924b7e033
4 changed files with 293 additions and 377 deletions

View File

@ -32,6 +32,7 @@ package org.glassfish.openesb.databasebc;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ParameterMetaData;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Collections; import java.util.Collections;
@ -200,12 +201,12 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
// TODO, need to change it later // TODO, need to change it later
public static final String DEFAULT_CLUSTER_JNDI_NAME = "jdbc/__defaultDS"; public static final String DEFAULT_CLUSTER_JNDI_NAME = "jdbc/__defaultDS";
/** /**
* JBI message exchange properties for message grouping and sequencing (new CRMP) * JBI message exchange properties for message grouping and sequencing (new CRMP)
*/ */
public static final String CRMP_GROUP_ID = "com.sun.jbi.messaging.groupid"; public static final String CRMP_GROUP_ID = "com.sun.jbi.messaging.groupid";
public static final String CRMP_MESSAGE_ID = "com.sun.jbi.messaging.messageid"; public static final String CRMP_MESSAGE_ID = "com.sun.jbi.messaging.messageid";
ReplyListener replyListener; ReplyListener replyListener;
public InboundMessageProcessor(final MessagingChannel chnl, final EndpointBean endpoint, public InboundMessageProcessor(final MessagingChannel chnl, final EndpointBean endpoint,
@ -220,12 +221,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
dbConnectionInfo = new DBConnectionInfo(); dbConnectionInfo = new DBConnectionInfo();
final DocumentBuilderFactory docBuilderFact = DocumentBuilderFactory.newInstance(); final DocumentBuilderFactory docBuilderFact = DocumentBuilderFactory.newInstance();
mDocBuilder = docBuilderFact.newDocumentBuilder(); mDocBuilder = docBuilderFact.newDocumentBuilder();
//mTxManager = (TransactionManager) context.getTransactionManager();
if(endpoint.isClustered()){ if(endpoint.isClustered()){
try{ try{
mJDBCClusterManager = new JDBCClusterManager(context); mJDBCClusterManager = new JDBCClusterManager(context);
}catch(Exception e){ }catch(Exception e){
//TODO //TODO
} }
} }
@ -310,10 +310,9 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
// Adding re-delivery/re-try support // Adding re-delivery/re-try support
// we are sending instead of the client context to the ReplyListener // we are sending instead of the client context to the ReplyListener
// the EndpointBean // the EndpointBean
MessageExchangeSupport.addReplyListener(mExchange.getExchangeId(), replyListener, epb); MessageExchangeSupport.addReplyListener(mExchange.getExchangeId(), replyListener, epb);
MessageExchangeSupport.addRedeliveryListener(mExchange.getExchangeId(), this, epb); MessageExchangeSupport.addRedeliveryListener(mExchange.getExchangeId(), this, epb);
Redelivery.setUniqueId(mExchange, exchangeId); Redelivery.setUniqueId(mExchange, exchangeId);
final String status = epb.getValue(EndpointBean.STATUS); final String status = epb.getValue(EndpointBean.STATUS);
if (! status.equalsIgnoreCase(EndpointBean.STATUS_RUNNING)) { if (! status.equalsIgnoreCase(EndpointBean.STATUS_RUNNING)) {
@ -325,7 +324,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} else if (mLogger.isLoggable(Level.INFO)) { } else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info("DBBC_W00667.IMP_EP_NOT_RUNNING"); mLogger.info("DBBC_W00667.IMP_EP_NOT_RUNNING");
} }
} else { } else {
switch (ExchangePattern.valueOf(mExchange)) { switch (ExchangePattern.valueOf(mExchange)) {
@ -379,18 +377,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
if (meta == null) { if (meta == null) {
throw new MessagingException(InboundMessageProcessor.mMessages.getString("DBBC_E00634.IMP_Invalid_Operation", throw new MessagingException(InboundMessageProcessor.mMessages.getString("DBBC_E00634.IMP_Invalid_Operation",
new Object[] { exchange.getOperation() })); new Object[] { exchange.getOperation() }));
} }
mPollMilliSeconds = meta.getJDBCSql().getPollMilliSeconds(); mPollMilliSeconds = meta.getJDBCSql().getPollMilliSeconds();
mSelectSQL = meta.getJDBCSql().getSql(); mSelectSQL = meta.getJDBCSql().getSql();
/*mPKName = Qualified(meta.getJDBCSql().getPKName());
mMarkColumnName = Qualified(meta.getJDBCSql().getMarkColumnName());
mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue();
mTableName = Qualified(meta.getJDBCSql().getTableName());
mPollingPostProcessing = meta.getJDBCSql().getPollingPostProcessing();
mMoveRowToTableName = Qualified(meta.getJDBCSql().getMoveRowToTableName());
*/
mPKName = meta.getJDBCSql().getPKName(); mPKName = meta.getJDBCSql().getPKName();
mMarkColumnName = meta.getJDBCSql().getMarkColumnName(); mMarkColumnName = meta.getJDBCSql().getMarkColumnName();
mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue(); mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue();
@ -400,26 +391,21 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mXAEnabled = meta.getJDBCSql().getTransaction(); mXAEnabled = meta.getJDBCSql().getTransaction();
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
// createNewDataSource(mXAEnabled,jndiName,epb);
// Throttle Check // Throttle Check
if(throttleConfigurationCheck()){ if(throttleConfigurationCheck()) {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
getTransactionManager().begin();
}
dbDataAccessObject = getDataAccessObject(meta);
if (mXAEnabled.equalsIgnoreCase("XATransaction")) { mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName);
// mtxFlag = startTrasaction(); epb.setTableName(mTableName);
getTransactionManager().begin();
} connection = getDatabaseConnection(jndiName);
if (mDbName==null){
dbDataAccessObject = getDataAccessObject(meta); mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase();
mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName); }
epb.setTableName(mTableName); String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME);
connection = getDatabaseConnection(jndiName);
if (mDbName==null){
mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase();
}
String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME);
if(epb.isClustered()){ if(epb.isClustered()){
try{ try{
if(jndiName.equalsIgnoreCase(clusterJNDIName)){ if(jndiName.equalsIgnoreCase(clusterJNDIName)){
@ -428,9 +414,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
String prdtName = DBMetaData.getDBType(mClusterConnection); String prdtName = DBMetaData.getDBType(mClusterConnection);
mJDBCClusterManager.setProductName(prdtName); mJDBCClusterManager.setProductName(prdtName);
}else{ }else{
mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
mClusterConnection.setAutoCommit(true); mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
String prdtName = DBMetaData.getDBType(mClusterConnection); String prdtName = DBMetaData.getDBType(mClusterConnection);
mJDBCClusterManager.setProductName(prdtName); mJDBCClusterManager.setProductName(prdtName);
} }
@ -438,100 +423,103 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
if(mClusterConnection == null){ if(mClusterConnection == null){
//TODO retry; //TODO retry;
throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION", throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION",
new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} )); new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} ));
} }
} }
} }
Transaction tx = getTransactionManager().getTransaction(); Transaction tx = getTransactionManager().getTransaction();
if (isSelectStatement(mSelectSQL)) { if (isSelectStatement(mSelectSQL)) {
rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); if(epb.isClustered()){
mJDBCClusterManager.setDataBaseConnection(mClusterConnection);
if (rs != null) { mJDBCClusterManager.setTableName(mTableName);
final JDBCNormalizer normalizer = new JDBCNormalizer(); mJDBCClusterManager.setInstanceName(epb.getInstanceName());
Probe normalizationMeasurement = Probe.info(getClass(), mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds);
epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION); mJDBCClusterManager.setPKName(mPKName);
mJDBCClusterManager.doClusterTasks();
mClusterConnection.setAutoCommit(false);
}
//if(epb.isClustered() && mClusterConnection != null){ rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL);
// normalizer.setConnection(mClusterConnection);
// } if (rs != null) {
if(epb.isClustered()){ final JDBCNormalizer normalizer = new JDBCNormalizer();
mJDBCClusterManager.setDataBaseConnection(mClusterConnection); Probe normalizationMeasurement = Probe.info(getClass(),
mJDBCClusterManager.setTableName(mTableName); epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
mJDBCClusterManager.setInstanceName(epb.getInstanceName());
mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds); normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords);
mJDBCClusterManager.setPKName(mPKName);
mJDBCClusterManager.doClusterTasks();
}
normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords);
normalizer.setRecordsProcessedList(mProcessedList); normalizer.setRecordsProcessedList(mProcessedList);
normalizer.setJDBCClusterManager(mJDBCClusterManager);
inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName); inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName);
mRowCount = normalizer.mRowCount; mRowCount = normalizer.mRowCount;
if(normalizationMeasurement != null){ if(normalizationMeasurement != null){
normalizationMeasurement.end(); normalizationMeasurement.end();
} }
final List tempList = epb.getProcessList();
final List tempList = epb.getProcessList(); if (!(tempList.isEmpty()))
if (!(tempList.isEmpty())) { {
// mTxHelper.handleInbound(exchange); if (epb.isClustered())
//set JNDI name on NormalizedMessage for dynamic addressing {
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); mJDBCClusterManager.addInstances(tempList);
exchange.setMessage(inMsg, "in"); mClusterConnection.setAutoCommit(true);
}
if (tx != null) { //set JNDI name on NormalizedMessage for dynamic addressing
mExchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, tx); inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
getTransactionManager().suspend(); exchange.setMessage(inMsg, "in");
}
mInboundExchanges.put(exchangeId, new ListenerMeta(
System.currentTimeMillis(), this));
if (tx != null) {
mExchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, tx);
getTransactionManager().suspend();
}
mChannel.send(exchange); mInboundExchanges.put(exchangeId, new ListenerMeta(
epb.getEndpointStatus().incrementSentRequests(); System.currentTimeMillis(), this));
// mTableExistsFlag = new Object();
mChannel.send(exchange);
epb.getEndpointStatus().incrementSentRequests();
if(epb.isClustered()){ if(epb.isClustered()){
//Records already sent to NMR so update the status to "SENT" for owner table //Records already sent to NMR so update the status to "SENT" for owner table
try{ try{
int i[] = mJDBCClusterManager.updateStatus(tempList, "SENT"); int i[] = mJDBCClusterManager.updateStatus(tempList, "SENT");
mLogger.log(Level.INFO, mLogger.log(Level.INFO,
"DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT", "DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT",
new Object[] { tempList }); new Object[] { tempList });
}catch(Exception e){ }catch(Exception e){
// TODO need to handled the exception // TODO need to handled the exception
mLogger.log(Level.SEVERE, mLogger.log(Level.SEVERE,
"DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT", "DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT",
new Object[] { tempList, e.getLocalizedMessage() }); new Object[] { tempList, e.getLocalizedMessage() });
} }
} }
} else { } else {
if (tx != null) { if (epb.isClustered())
try { mClusterConnection.setAutoCommit(true);
tx.commit(); if (tx != null) {
} catch (Exception ex) { try {
mLogger.log(Level.SEVERE, tx.commit();
"DBBC_E00656.IMP_XA_TX_COMMIT_FAILED", } catch (Exception ex) {
new Object[] { "commit", ex }); mLogger.log(Level.SEVERE,
throw ex; "DBBC_E00656.IMP_XA_TX_COMMIT_FAILED",
} new Object[] { "commit", ex });
} else { throw ex;
if (mXAEnabled.equalsIgnoreCase("XATransaction")) { }
mLogger.log(Level.WARNING, } else {
"DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE", if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
new Object[] { exchange.getExchangeId() }); mLogger.log(Level.WARNING,
} "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
} new Object[] { exchange.getExchangeId() });
} }
// mTableExistsFlag = new Object(); }
} }
} }
else if (epb.isClustered())
mClusterConnection.setAutoCommit(true);
}
} }
} catch (final Exception ex) { } catch (final Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00663.IMP_ERROR_WHILE_PROCESSING_MEP"), ex); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00663.IMP_ERROR_WHILE_PROCESSING_MEP"), ex);
Transaction tx = getTransactionManager().getTransaction(); Transaction tx = getTransactionManager().getTransaction();
if(tx != null ) { if (tx != null) {
tx.rollback(); tx.rollback();
} }
} finally { } finally {
@ -539,115 +527,112 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
if (rs != null) { if (rs != null) {
rs.close(); rs.close();
} }
}catch(SQLException se){ } catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS"), se); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS"), se);
} }
try{ try{
if (ps != null) { if (ps != null) {
ps.close(); ps.close();
} }
}catch(SQLException se){ }catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se);
} }
try{ try{
if (connection != null) { if (connection != null) {
connection.close(); connection.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
} }
try{ }catch(SQLException se){
if(epb.isClustered() && mClusterConnection != null){ mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
mClusterConnection.close(); }
mClusterConnection = null; try{
} if (mClusterConnection != null && mClusterConnection != connection){
}catch(SQLException se){ mClusterConnection.close();
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); mClusterConnection = null;
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
} }
} }
} }
/** Checks if the Throttling configuration is defined on the endpoint, /** Checks if the Throttling configuration is defined on the endpoint,
* if yes then checks if the messages in the system are within the throttle limit * if yes then checks if the messages in the system are within the throttle limit
* @param * @param
* @return boolean * @return boolean
*/ */
public boolean throttleConfigurationCheck() { public boolean throttleConfigurationCheck() {
synchronized(mInboundExchanges) { synchronized(mInboundExchanges) {
int pendingMsgs = mInboundExchanges.size(); int pendingMsgs = mInboundExchanges.size();
mThrottleNumber = epb.getMaxConcurrencyLimit(); mThrottleNumber = epb.getMaxConcurrencyLimit();
if (mThrottleNumber > 0 ) { if (mThrottleNumber > 0 ) {
if(pendingMsgs > mThrottleNumber){ if(pendingMsgs > mThrottleNumber){
if (mLogger.isLoggable(Level.FINEST)) { if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00664.IMP_THROTTLE_LIMIT_REACHED", mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00664.IMP_THROTTLE_LIMIT_REACHED",
new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) })); new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) }));
} else if (mLogger.isLoggable(Level.INFO)) { } else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED", mLogger.info(mMessages.getString("DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED",
new Object[] { Integer.toString(mThrottleNumber) })); new Object[] { Integer.toString(mThrottleNumber) }));
} }
return false; return false;
} else { } else {
if (mLogger.isLoggable(Level.FINEST)) { if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00665.IMP_THROTTLE_LIMIT_NOT_REACHED", mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00665.IMP_THROTTLE_LIMIT_NOT_REACHED",
new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) })); new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) }));
} else if (mLogger.isLoggable(Level.INFO)) { } else if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, mMessages.getString("DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED"), mLogger.log(Level.INFO, mMessages.getString("DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED"),
new Object[] { Integer.toString(mThrottleNumber) }); new Object[] { Integer.toString(mThrottleNumber) });
} }
return true; return true;
} }
} }
mLogger.log(Level.INFO, mMessages.getString("DBBC_R00666.IMP_THROTTLE_NOT_DEFINED")); mLogger.log(Level.INFO, mMessages.getString("DBBC_R00666.IMP_THROTTLE_NOT_DEFINED"));
return true; return true;
} }
} }
public ResultSet executeInboundSQLSelect(final EndpointBean eBean, public ResultSet executeInboundSQLSelect(final EndpointBean epb,
final OperationMetaData opMetaData, final OperationMetaData opMetaData,
Connection connection, Connection connection,
final String mTableName, final String mTableName,
String lSelectSQL) throws MessagingException { String lSelectSQL) throws MessagingException {
try { try {
String jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
String jndiName = eBean.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
String where = "";
if ((mMarkColumnName == null) || (mMarkColumnName.equals(""))) { List<String> bind = new ArrayList<String>();
// do nothing if (mMarkColumnName != null && !mMarkColumnName.equals("")) {
} else { if (mFlagColumnType != null) {
if(mFlagColumnType != null){ where = "("+mMarkColumnName+" != ? OR "+mMarkColumnName+" IS NULL)";
String whereClause = " where "; bind.add(mMarkColumnValue);
if((lSelectSQL.toUpperCase().contains(whereClause.toUpperCase()))) { } else {
if (mFlagColumnType.equalsIgnoreCase("LONGVARCHAR") || mFlagColumnType.equalsIgnoreCase("CHAR") final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00638.IMP_Error_IVALID_ColumnName") + mMarkColumnName;
|| mFlagColumnType.equalsIgnoreCase("VARCHAR")) {
lSelectSQL = lSelectSQL.concat(" and (" + mMarkColumnName + " != " + "'"
+ mMarkColumnValue + "'" + " or " + mMarkColumnName + " is NULL )");
} else {
lSelectSQL = lSelectSQL.concat(" and (" + mMarkColumnName + " != "
+ mMarkColumnValue + " or " + mMarkColumnName + " is NULL )");
}
}else {
if (mFlagColumnType.equalsIgnoreCase("LONGVARCHAR") || mFlagColumnType.equalsIgnoreCase("CHAR")
|| mFlagColumnType.equalsIgnoreCase("VARCHAR")) {
lSelectSQL = lSelectSQL.concat(" where (" + mMarkColumnName + " != " + "'"
+ mMarkColumnValue + "'" + " or " + mMarkColumnName + " is NULL )");
} else {
lSelectSQL = lSelectSQL.concat(" where (" + mMarkColumnName + " != "
+ mMarkColumnValue + " or " + mMarkColumnName + " is NULL )");
}
}
} else{
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00638.IMP_Error_IVALID_ColumnName") + mMarkColumnName;
throw new MessagingException(msg, new NamingException()); throw new MessagingException(msg, new NamingException());
}
} }
} if (epb.isClustered()) {
List<String> pkList = mJDBCClusterManager.selectAllProcessed();
if (pkList.size() > 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0, l = pkList.size(); i < l; i++)
sb.append(i < l-1 ? "?," : "?");
where = (where.equals("") ? "" : where+" AND ")+mPKName+" NOT IN ("+sb.toString()+")";
bind.addAll(pkList);
}
}
lSelectSQL = lSelectSQL.replace("$WHERE", where.equals("") ? "1=1" : where);
mLogger.log(Level.INFO, "Executing sql 1. " + lSelectSQL); mLogger.log(Level.INFO, "Executing sql 1. " + lSelectSQL);
ps = connection.prepareStatement(lSelectSQL); ps = connection.prepareStatement(lSelectSQL);
ParameterMetaData paramMetaData = ps.getParameterMetaData();
for (int i = 0, l = bind.size(); i < l; i++)
{
int columnType = java.sql.Types.VARCHAR;
try { columnType = paramMetaData.getParameterType(i+1); } catch(Exception e) {}
ps.setObject(i+1, JDBCUtil.convert(bind.get(i), columnType), columnType);
}
rs = ps.executeQuery(); rs = ps.executeQuery();
} }
catch (final SQLException ex) { catch (final SQLException ex) {
if (mLogger.isLoggable(Level.FINEST)) { if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL, ex); mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL, ex);
} else if (mLogger.isLoggable(Level.FINE)) { } else if (mLogger.isLoggable(Level.FINE)) {
@ -662,7 +647,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL
+ ex.getLocalizedMessage(); + ex.getLocalizedMessage();
throw new MessagingException(msg, ex); throw new MessagingException(msg, ex);
} }
return rs; return rs;
} }
@ -684,14 +669,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
prdtName = DBMetaData.getDBType(connection); prdtName = DBMetaData.getDBType(connection);
rs = connection.getMetaData().getColumns(catalog, mSchemaName, mTableName, "%"); rs = connection.getMetaData().getColumns(catalog, mSchemaName, mTableName, "%");
int noofColCounter = -1; int noofColCounter = -1;
// if(rs==null){
// final String msg = InboundMessageProcessor.mMessages.getString("IMP_Table_NotExist");
// throw new MessagingException(msg, new NamingException());
// }
while (rs.next()) { while (rs.next()) {
noofColCounter++; noofColCounter++;
final String colName = rs.getString("COLUMN_NAME"); final String colName = rs.getString("COLUMN_NAME");
if (colName.equalsIgnoreCase(meta.getJDBCSql().getPKName())) { if (colName.equalsIgnoreCase(meta.getJDBCSql().getPKName())) {
final String defaultValue = rs.getString("COLUMN_DEF"); final String defaultValue = rs.getString("COLUMN_DEF");
@ -707,11 +688,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} }
} }
if(noofColCounter < 0 ){ if(noofColCounter < 0 ){
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00636.IMP_Table_NotExist"); final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00636.IMP_Table_NotExist");
throw new MessagingException(msg, new NamingException()); throw new MessagingException(msg, new NamingException());
} }
if (mPKType == null) { if (mPKType == null) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00637.IMP_PrimaryKey_Error"); final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00637.IMP_PrimaryKey_Error");
throw new MessagingException(msg, new NamingException()); throw new MessagingException(msg, new NamingException());
} }
if (prdtName.equalsIgnoreCase(InboundMessageProcessor.DERBY_PROD_NAME)) { if (prdtName.equalsIgnoreCase(InboundMessageProcessor.DERBY_PROD_NAME)) {
@ -855,7 +836,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mLogger.info(mMessages.getString("IMP_POST_PROCESS_FAILED")); mLogger.info(mMessages.getString("IMP_POST_PROCESS_FAILED"));
} }
mLogger.log(Level.SEVERE, "IMP_POST_PROCESS_FAILED", mLogger.log(Level.SEVERE, "IMP_POST_PROCESS_FAILED",
new Object[] { ex.getLocalizedMessage() }); new Object[] { ex.getLocalizedMessage() });
} }
} }
connection = getDatabaseConnection(jndiName); connection = getDatabaseConnection(jndiName);
@ -1009,30 +990,29 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception);
} }
} }
} }
// for cluster environment // for cluster environment
if(epb.isClustered()){ if(epb.isClustered()){
Connection con = null;
try{ try{
Connection con = null;
List records = (List)mMapInboundExchangesProcessRecords.get(messageId); List records = (List)mMapInboundExchangesProcessRecords.get(messageId);
if(jndiName.equalsIgnoreCase(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME))){ if(jndiName.equalsIgnoreCase(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME))){
con = connection; con = connection;
}else{ }else{
con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
} }
mJDBCClusterManager.updateStatusToDone(records, "DONE", con); mJDBCClusterManager.setDataBaseConnection(con);
mJDBCClusterManager.deleteInstances(records);
mLogger.log(Level.INFO, mLogger.log(Level.INFO,
"DBBC_R10906.IMP_UPDATED_STATUS_TO_DONE", "DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE",
new Object[] { records }); new Object[] { records });
}catch(Exception e){ }catch(Exception e){
mLogger.log(Level.SEVERE, "Unable to set the status to DOne", e.getLocalizedMessage()); mLogger.log(Level.SEVERE, "Unable to delete processed records", e.getLocalizedMessage());
}finally { }finally {
try{ try{
if(con != null){ if(con != null && con != connection){
con.close(); con.close();
con = null; }
}
}catch(SQLException se){ }catch(SQLException se){
mLogger.log(Level.SEVERE, "Unable to close the connection", se.getLocalizedMessage()); mLogger.log(Level.SEVERE, "Unable to close the connection", se.getLocalizedMessage());
} }
@ -1044,8 +1024,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
String pkNameRet = (String) it.next(); String pkNameRet = (String) it.next();
mProcessedList.remove(pkNameRet); mProcessedList.remove(pkNameRet);
} }
mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId });
messageId });
if (isTransacted && exchange instanceof InOnly) { if (isTransacted && exchange instanceof InOnly) {
try { try {
// As we are the initiator for tx we have to rollback // As we are the initiator for tx we have to rollback
@ -1079,9 +1058,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} }
try{ try{
if(connection != null) { if(connection != null) {
connection.close(); connection.close();
} }
}catch(SQLException se){ }catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
} }
@ -1098,7 +1076,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
if (mLogger.isLoggable(Level.FINER)) { if (mLogger.isLoggable(Level.FINER)) {
mLogger.log(Level.FINER, " resuing txn ", new Object[] { tx.toString() }); mLogger.log(Level.FINER, " resuing txn ", new Object[] { tx.toString() });
} }
} }
} }
@ -1186,10 +1163,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} }
private TransactionManager getTransactionManager() { private TransactionManager getTransactionManager() {
return (TransactionManager)mContext.getTransactionManager(); return (TransactionManager)mContext.getTransactionManager();
} }
public void setMessageExchangeId(String messageExchangeId, Object retryMetaData) { public void setMessageExchangeId(String messageExchangeId, Object retryMetaData) {
exchangeIDToMeta.put(messageExchangeId, retryMetaData); exchangeIDToMeta.put(messageExchangeId, retryMetaData);
} }
@ -1201,7 +1178,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
// remove the listener associated with the exchange ID // remove the listener associated with the exchange ID
MessageExchangeSupport.removeRedeliveryListener(exchange.getExchangeId()); MessageExchangeSupport.removeRedeliveryListener(exchange.getExchangeId());
mInboundExchanges.remove(exchange.getExchangeId()); mInboundExchanges.remove(exchange.getExchangeId());
try{ try{
switch (ExchangePattern.valueOf(exchange)) { switch (ExchangePattern.valueOf(exchange)) {
case IN_OUT: case IN_OUT:
@ -1230,14 +1207,14 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} else if (mLogger.isLoggable(Level.INFO)) { } else if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, "Resending the InOnly exchange"); mLogger.log(Level.INFO, "Resending the InOnly exchange");
} }
inMsg = ((InOnly)exchange).getInMessage(); inMsg = ((InOnly)exchange).getInMessage();
InOnly inonly = mMsgExchangeFactory.createInOnlyExchange(); InOnly inonly = mMsgExchangeFactory.createInOnlyExchange();
// make sure that the message id has is the same // make sure that the message id has is the same
inonly.setProperty(CRMP_GROUP_ID, groupId); inonly.setProperty(CRMP_GROUP_ID, groupId);
inonly.setProperty(CRMP_MESSAGE_ID, messageId); inonly.setProperty(CRMP_MESSAGE_ID, messageId);
//processInOnly(inonly, inMsg, operationMetaData); //processInOnly(inonly, inMsg, operationMetaData);
if (mServiceEndpoint == null) { if (mServiceEndpoint == null) {
mServiceEndpoint = locateServiceEndpoint(); mServiceEndpoint = locateServiceEndpoint();
epb.setValueObj(EndpointBean.ENDPOINT_REFERENCE, mServiceEndpoint); epb.setValueObj(EndpointBean.ENDPOINT_REFERENCE, mServiceEndpoint);
@ -1256,9 +1233,9 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
String pkNameRet = (String) it.next(); String pkNameRet = (String) it.next();
mProcessedList.remove(pkNameRet); mProcessedList.remove(pkNameRet);
} }
// Removing the records from the Map // Removing the records from the Map
mMapInboundExchangesProcessRecords.remove(exchange.getExchangeId()); mMapInboundExchangesProcessRecords.remove(exchange.getExchangeId());
processInOnly(inonly,operationMetaData); processInOnly(inonly,operationMetaData);
break; break;
default: default:
@ -1274,9 +1251,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
throw new MessagingException(e); throw new MessagingException(e);
} }
} }
/*
* Runtime Config object to cluster JNDI name /* Runtime Config object to cluster JNDI name
*
*/ */
public void setRuntimeConfig(RuntimeConfiguration runtimeConfg){ public void setRuntimeConfig(RuntimeConfiguration runtimeConfg){
mRuntimeConfig = runtimeConfg; mRuntimeConfig = runtimeConfg;

View File

@ -81,6 +81,8 @@ public class JDBCClusterManager {
private String BASE_INSTANCESTATE_UPDATE_STMT_STR; private String BASE_INSTANCESTATE_UPDATE_STMT_STR;
private String BASE_OWNERTABLE_SELECTALL_STMT_STR;
private String BASE_OWNERTABLE_INSERT_STMT_STR; private String BASE_OWNERTABLE_INSERT_STMT_STR;
private String BASE_OWNERTABLE_UPDATE_STMT_STR; private String BASE_OWNERTABLE_UPDATE_STMT_STR;
@ -464,20 +466,18 @@ public class JDBCClusterManager {
} }
/* /*
* Check if the record is already processed by another instance. @return boolean recordInserted * Insert new rows with the "In Progress" state
* if not inserted, insert the record with current instance name and status to "In Progress"
*/ */
public boolean isRecordInsertedByCurrentInstance() { public int[] addInstances(List pkList) throws Exception {
boolean recordInserted = false;
String insertQuery = BASE_OWNERTABLE_INSERT_STMT_STR;
PreparedStatement ps = null; PreparedStatement ps = null;
ParameterMetaData paramMetaData = null; ParameterMetaData paramMetaData = null;
int parameters = 0; int parameters = 0;
Connection con = null; Connection con = null;
int[] executedRows = null;
try { try {
con = getDataBaseConnection(); con = getDataBaseConnection();
ps = con.prepareStatement(insertQuery); ps = con.prepareStatement(BASE_OWNERTABLE_INSERT_STMT_STR);
paramMetaData = ps.getParameterMetaData(); paramMetaData = ps.getParameterMetaData();
if (paramMetaData != null) { if (paramMetaData != null) {
parameters = paramMetaData.getParameterCount(); parameters = paramMetaData.getParameterCount();
} }
@ -486,45 +486,56 @@ public class JDBCClusterManager {
} catch (Exception ex) { } catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} }
if (parameters != 0) { try {
if ((getPKValue() != null) && !getPKValue().trim().equals("")) { if (parameters != 0) {
// set default type. addBatch(pkList, paramMetaData, ps, "In Progress", true);
int columnType = java.sql.Types.VARCHAR; }
try { executedRows = ps.executeBatch();
try{ for (int i = 0; i < executedRows.length; i++) {
columnType = paramMetaData.getParameterType(1); if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) {
}catch(Exception e){ throw new SQLException(
} "One of the Queries in the batch didn't update any rows, Should have updated atleast one row");
ps.setObject(1, JDBCUtil.convert(getPKValue(), columnType), columnType); }
try{ mLogger.log(Level.INFO, "Inserted In Progress OWNER record "+pkList.get(i).toString());
columnType = paramMetaData.getParameterType(2); }
}catch(Exception e){ } catch(Exception e) {
} if(ps != null){
ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType); try{
try{ ps.clearBatch();
columnType = paramMetaData.getParameterType(3); ps.close();
}catch(Exception e){ } catch(SQLException se){
} mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), se);
ps.setObject(3, JDBCUtil.convert("In Progress", columnType), columnType);
int rowsUpdated = ps.executeUpdate();
recordInserted = true;
} catch (final Exception e) {
mLogger.log(Level.WARNING, e.getLocalizedMessage());
mLogger.log(Level.INFO, mMessages.getString("DBBC-R10903.JCM_RECORD_LOCKED", new Object[]{getInstanceName()}));
recordInserted = false;
}finally{
if(ps != null){
try{
ps.close();
} catch(SQLException e){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
e);
}
}
}
} }
} }
return recordInserted; throw e;
}
return executedRows;
}
public void deleteInstances(List pkList) throws Exception {
if (pkList.size() <= 0)
return;
PreparedStatement ps = null;
String sql = BASE_OWNERTABLE_DELETE_STMT_STR+" (";
for (int i = 0, l = pkList.size(); i < l; i++)
sql += (i < l-1 ? "?," : "?)");
ParameterMetaData paramMetaData = null;
int parameters = 0;
Connection con = null;
int[] executedRows = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(sql);
ps.setString(1, getInstanceName());
for (int i = 0, l = pkList.size(); i < l; i++)
ps.setString(i+2, (String)pkList.get(i));
if (mLogger.isLoggable(Level.FINE))
mLogger.log(Level.FINE, "Executing SQL: "+sql); //$NON-NLS-1$
ps.execute();
} catch (Exception e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), e);
throw e;
}
} }
/* /*
@ -552,9 +563,9 @@ public class JDBCClusterManager {
} catch (Exception ex) { } catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} }
try{ try {
if (parameters != 0) { if (parameters != 0) {
addBatch(pkList, paramMetaData, ps, status); addBatch(pkList, paramMetaData, ps, status, false);
} }
executedRows = ps.executeBatch(); executedRows = ps.executeBatch();
for (int i = 0; i < executedRows.length; i++) { for (int i = 0; i < executedRows.length; i++) {
@ -564,18 +575,7 @@ public class JDBCClusterManager {
} }
; ;
} }
}catch(SQLException e){ } catch(Exception e) {
if(ps != null){
try{
ps.clearBatch();
ps.close();
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
e);
}
}
throw e;
}finally {
if(ps != null){ if(ps != null){
try{ try{
ps.clearBatch(); ps.clearBatch();
@ -585,12 +585,13 @@ public class JDBCClusterManager {
se); se);
} }
} }
throw e;
} }
return executedRows; return executedRows;
} }
private void addBatch(List pkList, ParameterMetaData parameterMeta, PreparedStatement ps, String status) throws SQLException, Exception { private void addBatch(List pkList, ParameterMetaData parameterMeta, PreparedStatement ps, String status, boolean forIns) throws Exception {
if (!pkList.isEmpty()) { if (!pkList.isEmpty()) {
for (final Iterator it = pkList.iterator(); it.hasNext();) { for (final Iterator it = pkList.iterator(); it.hasNext();) {
String pkValue = (String) it.next(); String pkValue = (String) it.next();
@ -598,19 +599,18 @@ public class JDBCClusterManager {
// set default type. // set default type.
int columnType = java.sql.Types.VARCHAR; int columnType = java.sql.Types.VARCHAR;
try { try {
try{ if (forIns)
columnType = parameterMeta.getParameterType(1); {
}catch(Exception e){ try { columnType = parameterMeta.getParameterType(2); } catch(Exception e) {}
} ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType);
ps.setObject(1, JDBCUtil.convert(status, columnType), columnType);
try{
columnType = parameterMeta.getParameterType(2);
}catch(Exception e){
} }
ps.setObject(2, JDBCUtil.convert(pkValue, columnType), columnType); try { columnType = parameterMeta.getParameterType(forIns ? 3 : 1); } catch(Exception e) {}
ps.setObject(forIns ? 3 : 1, JDBCUtil.convert(status, columnType), columnType);
try { columnType = parameterMeta.getParameterType(forIns ? 1 : 2); } catch(Exception e) {}
ps.setObject(forIns ? 1 : 2, JDBCUtil.convert(pkValue, columnType), columnType);
ps.addBatch(); ps.addBatch();
} catch (SQLException e) { } catch (Exception e) {
mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS"), e); mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11005.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS"), e);
if(ps != null){ if(ps != null){
try{ try{
ps.clearBatch(); ps.clearBatch();
@ -621,68 +621,22 @@ public class JDBCClusterManager {
} }
} }
throw e; throw e;
}catch(Exception ex){
throw ex;
} }
} }
} }
} }
} }
public int[] updateStatusToDone(List pkList, String status, Connection conn) throws Exception { public List selectAllProcessed() throws Exception
String updateQuery = BASE_OWNERTABLE_UPDATE_STMT_STR; {
PreparedStatement ps = null; Connection con = getDataBaseConnection();
ParameterMetaData paramMetaData = null; List<String> pkList = new ArrayList<String>();
int parameters = 0; Statement st = con.createStatement();
Connection con = conn; st.execute(BASE_OWNERTABLE_SELECTALL_STMT_STR);
int[] executedRows = null; ResultSet rs = st.getResultSet();
try { while (rs.next())
ps = con.prepareStatement(updateQuery); pkList.add(rs.getString(1));
paramMetaData = ps.getParameterMetaData(); return pkList;
if (paramMetaData != null) {
parameters = paramMetaData.getParameterCount();
}
} catch (SQLException ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
}
try{
if (parameters != 0) {
addBatch(pkList, paramMetaData, ps, status);
}
executedRows = ps.executeBatch();
for (int i = 0; i < executedRows.length; i++) {
if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) {
throw new SQLException(
"One of the Queries in the batch didn't update any rows, Should have updated atleast one row");
}
;
}
}catch(SQLException e){
if(ps != null){
try{
ps.clearBatch();
ps.close();
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
e);
}
}
throw e;
}finally {
if(ps != null){
try{
ps.clearBatch();
ps.close();
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
se);
}
}
}
return executedRows;
} }
/* /*
@ -698,13 +652,17 @@ public class JDBCClusterManager {
BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$ BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$
" SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$ " SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$
} }
if(BASE_OWNERTABLE_SELECTALL_STMT_STR == null){
BASE_OWNERTABLE_SELECTALL_STMT_STR = "SELECT "+getPKName()+" FROM OWNER_"+getTableName()+
" FOR UPDATE";
}
if(BASE_OWNERTABLE_INSERT_STMT_STR == null){ if(BASE_OWNERTABLE_INSERT_STMT_STR == null){
BASE_OWNERTABLE_INSERT_STMT_STR = "INSERT INTO OWNER_" + getTableName() + //$NON-NLS-1$ BASE_OWNERTABLE_INSERT_STMT_STR = "INSERT INTO OWNER_" + getTableName() + //$NON-NLS-1$
" VALUES(?, ?, ?)"; //$NON-NLS-1$ " VALUES(?, ?, ?)"; //$NON-NLS-1$
} }
if(BASE_OWNERTABLE_DELETE_STMT_STR == null){ if(BASE_OWNERTABLE_DELETE_STMT_STR == null){
BASE_OWNERTABLE_DELETE_STMT_STR = "DELETE FROM OWNER_" + getTableName() + //$NON-NLS-1$ BASE_OWNERTABLE_DELETE_STMT_STR = "DELETE FROM OWNER_" + getTableName() + //$NON-NLS-1$
"WHERE INSTANCEID = ? and status= ?"; //$NON-NLS-1$ " WHERE instance_name=? AND "+getPKName()+" IN "; //$NON-NLS-1$
} }
if(BASE_OWNERTABLE_UPDATE_STMT_STR == null){ if(BASE_OWNERTABLE_UPDATE_STMT_STR == null){
BASE_OWNERTABLE_UPDATE_STMT_STR = "UPDATE OWNER_" + getTableName() + //$NON-NLS-1$ BASE_OWNERTABLE_UPDATE_STMT_STR = "UPDATE OWNER_" + getTableName() + //$NON-NLS-1$

View File

@ -96,7 +96,6 @@ public class JDBCNormalizer {
public ArrayList mProcessedList = new ArrayList(); public ArrayList mProcessedList = new ArrayList();
public Map mInboundExchangeProcessRecordsMap = new HashMap(); public Map mInboundExchangeProcessRecordsMap = new HashMap();
public int mRowCount = 0; public int mRowCount = 0;
private JDBCClusterManager mJDBCClusterManager;
/** Creates a new instance of SoapNormalizer /** Creates a new instance of SoapNormalizer
* @throws javax.jbi.messaging.MessagingException * @throws javax.jbi.messaging.MessagingException
@ -372,29 +371,16 @@ public class JDBCNormalizer {
pkName) || ("\"" + colName + "\""). pkName) || ("\"" + colName + "\"").
equalsIgnoreCase( equalsIgnoreCase(
pkName)) pkName))
if (epb.isClustered()) { {
boolean inserted = boolean processed =
false; isRecordProcessed(
mJDBCClusterManager.setPKValue( colValue);
if (!processed)
pKeyList.add(
colValue); colValue);
inserted = else
mJDBCClusterManager. record = null;
isRecordInsertedByCurrentInstance(); }
if (!inserted)
record = null;
else
pKeyList.add(
colValue);
} else {
boolean processed =
isRecordProcessed(
colValue);
if (!processed)
pKeyList.add(
colValue);
else
record = null;
}
if (record != null) { if (record != null) {
final Element e = NS != null final Element e = NS != null
? normalDoc.createElementNS( ? normalDoc.createElementNS(
@ -1274,10 +1260,6 @@ public class JDBCNormalizer {
this.mProcessedList = list; this.mProcessedList = list;
} }
public void setJDBCClusterManager(JDBCClusterManager jdbcClusterManager) {
this.mJDBCClusterManager = jdbcClusterManager;
}
private boolean isRecordProcessed(String colValue) { private boolean isRecordProcessed(String colValue) {
boolean recordProcessed = true; boolean recordProcessed = true;
if (mProcessedList.isEmpty() || !mProcessedList.contains(colValue)) { if (mProcessedList.isEmpty() || !mProcessedList.contains(colValue)) {

View File

@ -113,12 +113,12 @@ DBBC_R00666.IMP_THROTTLE_NOT_DEFINED=Throttling configuration is not defined on
DBBC_W00667.IMP_EP_NOT_RUNNING=EndPoint \[{0}\] is not in state RUNNING. Ignoring received message. DBBC_W00667.IMP_EP_NOT_RUNNING=EndPoint \[{0}\] is not in state RUNNING. Ignoring received message.
DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED=The number of messages exceed the throttle limit {0} DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED=The number of messages exceed the throttle limit {0}
DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED=The number of messages are within the throttle limit {0} DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED=The number of messages are within the throttle limit {0}
DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT=DBBC_R10906.Update the status to SENT, records sent to BPEL {0}. DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT=Update the status to SENT, records sent to BPEL {0}.
DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT=DBBC_E11108.Unable to update the status to SENT {0}, exception is {1} DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT=Unable to update the status to SENT {0}, exception is {1}
DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE=DBBC_R10907.Updated the status to DONE, records processed {0} DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE=Finished processing records {0}, OWNER_ rows deleted.
DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS=DBBC_E11109.Unable to close the result set, exception is {0} DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS=Unable to close the result set, exception is {0}
DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS=DBBC_E11110.Unable to close the statement, exception is {0} DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS=Unable to close the statement, exception is {0}
DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION=DBBC_E11111.Unable to close the connection, exception is {0} DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION=Unable to close the connection, exception is {0}
############################ resource bundles for OutboundMessageProcessor ################ ############################ resource bundles for OutboundMessageProcessor ################
DBBC_R00606.OMP_Accept_msg=Accepted message with exchange ID {0} in DBBC outbound message processor. DBBC_R00606.OMP_Accept_msg=Accepted message with exchange ID {0} in DBBC outbound message processor.