diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java index b51408706..4a952e259 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java @@ -32,6 +32,7 @@ package org.glassfish.openesb.databasebc; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ParameterMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; @@ -200,12 +201,12 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi // TODO, need to change it later public static final String DEFAULT_CLUSTER_JNDI_NAME = "jdbc/__defaultDS"; - /** + /** * JBI message exchange properties for message grouping and sequencing (new CRMP) */ public static final String CRMP_GROUP_ID = "com.sun.jbi.messaging.groupid"; public static final String CRMP_MESSAGE_ID = "com.sun.jbi.messaging.messageid"; - + ReplyListener replyListener; public InboundMessageProcessor(final MessagingChannel chnl, final EndpointBean endpoint, @@ -220,12 +221,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi dbConnectionInfo = new DBConnectionInfo(); final DocumentBuilderFactory docBuilderFact = DocumentBuilderFactory.newInstance(); mDocBuilder = docBuilderFact.newDocumentBuilder(); - //mTxManager = (TransactionManager) context.getTransactionManager(); if(endpoint.isClustered()){ try{ mJDBCClusterManager = new JDBCClusterManager(context); }catch(Exception e){ - //TODO + //TODO } } @@ -310,10 +310,9 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi // Adding re-delivery/re-try support // we are sending instead of the client context to the ReplyListener // the EndpointBean - MessageExchangeSupport.addReplyListener(mExchange.getExchangeId(), replyListener, epb); - MessageExchangeSupport.addRedeliveryListener(mExchange.getExchangeId(), this, epb); - Redelivery.setUniqueId(mExchange, exchangeId); - + MessageExchangeSupport.addReplyListener(mExchange.getExchangeId(), replyListener, epb); + MessageExchangeSupport.addRedeliveryListener(mExchange.getExchangeId(), this, epb); + Redelivery.setUniqueId(mExchange, exchangeId); final String status = epb.getValue(EndpointBean.STATUS); if (! status.equalsIgnoreCase(EndpointBean.STATUS_RUNNING)) { @@ -325,7 +324,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } else if (mLogger.isLoggable(Level.INFO)) { mLogger.info("DBBC_W00667.IMP_EP_NOT_RUNNING"); } - } else { switch (ExchangePattern.valueOf(mExchange)) { @@ -379,18 +377,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi if (meta == null) { throw new MessagingException(InboundMessageProcessor.mMessages.getString("DBBC_E00634.IMP_Invalid_Operation", - new Object[] { exchange.getOperation() })); + new Object[] { exchange.getOperation() })); } mPollMilliSeconds = meta.getJDBCSql().getPollMilliSeconds(); mSelectSQL = meta.getJDBCSql().getSql(); - /*mPKName = Qualified(meta.getJDBCSql().getPKName()); - mMarkColumnName = Qualified(meta.getJDBCSql().getMarkColumnName()); - mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue(); - mTableName = Qualified(meta.getJDBCSql().getTableName()); - mPollingPostProcessing = meta.getJDBCSql().getPollingPostProcessing(); - mMoveRowToTableName = Qualified(meta.getJDBCSql().getMoveRowToTableName()); - */ mPKName = meta.getJDBCSql().getPKName(); mMarkColumnName = meta.getJDBCSql().getMarkColumnName(); mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue(); @@ -400,26 +391,21 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mXAEnabled = meta.getJDBCSql().getTransaction(); jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); - // createNewDataSource(mXAEnabled,jndiName,epb); - // Throttle Check - if(throttleConfigurationCheck()){ + if(throttleConfigurationCheck()) { + if (mXAEnabled.equalsIgnoreCase("XATransaction")) { + getTransactionManager().begin(); + } - - if (mXAEnabled.equalsIgnoreCase("XATransaction")) { - // mtxFlag = startTrasaction(); - getTransactionManager().begin(); - } - - dbDataAccessObject = getDataAccessObject(meta); - mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName); - epb.setTableName(mTableName); - - connection = getDatabaseConnection(jndiName); - if (mDbName==null){ - mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); - } - String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME); + dbDataAccessObject = getDataAccessObject(meta); + mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName); + epb.setTableName(mTableName); + + connection = getDatabaseConnection(jndiName); + if (mDbName==null){ + mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); + } + String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME); if(epb.isClustered()){ try{ if(jndiName.equalsIgnoreCase(clusterJNDIName)){ @@ -428,9 +414,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi String prdtName = DBMetaData.getDBType(mClusterConnection); mJDBCClusterManager.setProductName(prdtName); }else{ - mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); - mClusterConnection.setAutoCommit(true); - mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); + mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); + mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); String prdtName = DBMetaData.getDBType(mClusterConnection); mJDBCClusterManager.setProductName(prdtName); } @@ -438,100 +423,103 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi if(mClusterConnection == null){ //TODO retry; throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION", - new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} )); + new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} )); } } } - Transaction tx = getTransactionManager().getTransaction(); - if (isSelectStatement(mSelectSQL)) { - rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); - - if (rs != null) { - final JDBCNormalizer normalizer = new JDBCNormalizer(); - Probe normalizationMeasurement = Probe.info(getClass(), - epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION); + Transaction tx = getTransactionManager().getTransaction(); + if (isSelectStatement(mSelectSQL)) { + if(epb.isClustered()){ + mJDBCClusterManager.setDataBaseConnection(mClusterConnection); + mJDBCClusterManager.setTableName(mTableName); + mJDBCClusterManager.setInstanceName(epb.getInstanceName()); + mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds); + mJDBCClusterManager.setPKName(mPKName); + mJDBCClusterManager.doClusterTasks(); + mClusterConnection.setAutoCommit(false); + } - //if(epb.isClustered() && mClusterConnection != null){ - // normalizer.setConnection(mClusterConnection); - // } - if(epb.isClustered()){ - mJDBCClusterManager.setDataBaseConnection(mClusterConnection); - mJDBCClusterManager.setTableName(mTableName); - mJDBCClusterManager.setInstanceName(epb.getInstanceName()); - mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds); - mJDBCClusterManager.setPKName(mPKName); - mJDBCClusterManager.doClusterTasks(); - } - normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords); + rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); + + if (rs != null) { + final JDBCNormalizer normalizer = new JDBCNormalizer(); + Probe normalizationMeasurement = Probe.info(getClass(), + epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION); + + normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords); normalizer.setRecordsProcessedList(mProcessedList); - normalizer.setJDBCClusterManager(mJDBCClusterManager); inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName); mRowCount = normalizer.mRowCount; if(normalizationMeasurement != null){ - normalizationMeasurement.end(); - } + normalizationMeasurement.end(); + } - - final List tempList = epb.getProcessList(); - if (!(tempList.isEmpty())) { - // mTxHelper.handleInbound(exchange); - //set JNDI name on NormalizedMessage for dynamic addressing - inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); - exchange.setMessage(inMsg, "in"); - - if (tx != null) { - mExchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, tx); - getTransactionManager().suspend(); - } - - mInboundExchanges.put(exchangeId, new ListenerMeta( - System.currentTimeMillis(), this)); + final List tempList = epb.getProcessList(); + if (!(tempList.isEmpty())) + { + if (epb.isClustered()) + { + mJDBCClusterManager.addInstances(tempList); + mClusterConnection.setAutoCommit(true); + } + //set JNDI name on NormalizedMessage for dynamic addressing + inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); + exchange.setMessage(inMsg, "in"); + if (tx != null) { + mExchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, tx); + getTransactionManager().suspend(); + } - mChannel.send(exchange); - epb.getEndpointStatus().incrementSentRequests(); - // mTableExistsFlag = new Object(); + mInboundExchanges.put(exchangeId, new ListenerMeta( + System.currentTimeMillis(), this)); + + mChannel.send(exchange); + epb.getEndpointStatus().incrementSentRequests(); if(epb.isClustered()){ //Records already sent to NMR so update the status to "SENT" for owner table try{ int i[] = mJDBCClusterManager.updateStatus(tempList, "SENT"); mLogger.log(Level.INFO, - "DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT", - new Object[] { tempList }); + "DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT", + new Object[] { tempList }); }catch(Exception e){ // TODO need to handled the exception mLogger.log(Level.SEVERE, - "DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT", - new Object[] { tempList, e.getLocalizedMessage() }); + "DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT", + new Object[] { tempList, e.getLocalizedMessage() }); } } - } else { - if (tx != null) { - try { - tx.commit(); - } catch (Exception ex) { - mLogger.log(Level.SEVERE, - "DBBC_E00656.IMP_XA_TX_COMMIT_FAILED", - new Object[] { "commit", ex }); - throw ex; - } - } else { - if (mXAEnabled.equalsIgnoreCase("XATransaction")) { - mLogger.log(Level.WARNING, - "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE", - new Object[] { exchange.getExchangeId() }); - } - } - } - // mTableExistsFlag = new Object(); - } - } + } else { + if (epb.isClustered()) + mClusterConnection.setAutoCommit(true); + if (tx != null) { + try { + tx.commit(); + } catch (Exception ex) { + mLogger.log(Level.SEVERE, + "DBBC_E00656.IMP_XA_TX_COMMIT_FAILED", + new Object[] { "commit", ex }); + throw ex; + } + } else { + if (mXAEnabled.equalsIgnoreCase("XATransaction")) { + mLogger.log(Level.WARNING, + "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE", + new Object[] { exchange.getExchangeId() }); + } + } + } + } + else if (epb.isClustered()) + mClusterConnection.setAutoCommit(true); + } } } catch (final Exception ex) { mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00663.IMP_ERROR_WHILE_PROCESSING_MEP"), ex); Transaction tx = getTransactionManager().getTransaction(); - if(tx != null ) { + if (tx != null) { tx.rollback(); } } finally { @@ -539,115 +527,112 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi if (rs != null) { rs.close(); } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS"), se); - } - try{ + } catch(SQLException se){ + mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS"), se); + } + try{ if (ps != null) { ps.close(); } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se); - } - try{ + }catch(SQLException se){ + mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se); + } + try{ if (connection != null) { connection.close(); - } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); } - try{ - if(epb.isClustered() && mClusterConnection != null){ - mClusterConnection.close(); - mClusterConnection = null; - } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); + }catch(SQLException se){ + mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); + } + try{ + if (mClusterConnection != null && mClusterConnection != connection){ + mClusterConnection.close(); + mClusterConnection = null; + } + }catch(SQLException se){ + mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); } } } - - /** Checks if the Throttling configuration is defined on the endpoint, + + /** Checks if the Throttling configuration is defined on the endpoint, * if yes then checks if the messages in the system are within the throttle limit * @param * @return boolean */ public boolean throttleConfigurationCheck() { - - synchronized(mInboundExchanges) { - int pendingMsgs = mInboundExchanges.size(); - mThrottleNumber = epb.getMaxConcurrencyLimit(); - if (mThrottleNumber > 0 ) { - if(pendingMsgs > mThrottleNumber){ + + synchronized(mInboundExchanges) { + int pendingMsgs = mInboundExchanges.size(); + mThrottleNumber = epb.getMaxConcurrencyLimit(); + if (mThrottleNumber > 0 ) { + if(pendingMsgs > mThrottleNumber){ if (mLogger.isLoggable(Level.FINEST)) { - mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00664.IMP_THROTTLE_LIMIT_REACHED", - new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) })); + mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00664.IMP_THROTTLE_LIMIT_REACHED", + new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) })); } else if (mLogger.isLoggable(Level.INFO)) { mLogger.info(mMessages.getString("DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED", new Object[] { Integer.toString(mThrottleNumber) })); } - return false; - } else { + return false; + } else { if (mLogger.isLoggable(Level.FINEST)) { - mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00665.IMP_THROTTLE_LIMIT_NOT_REACHED", - new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) })); + mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00665.IMP_THROTTLE_LIMIT_NOT_REACHED", + new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) })); } else if (mLogger.isLoggable(Level.INFO)) { mLogger.log(Level.INFO, mMessages.getString("DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED"), new Object[] { Integer.toString(mThrottleNumber) }); } - return true; - } - } - mLogger.log(Level.INFO, mMessages.getString("DBBC_R00666.IMP_THROTTLE_NOT_DEFINED")); - return true; - } + return true; + } + } + mLogger.log(Level.INFO, mMessages.getString("DBBC_R00666.IMP_THROTTLE_NOT_DEFINED")); + return true; + } } - public ResultSet executeInboundSQLSelect(final EndpointBean eBean, + public ResultSet executeInboundSQLSelect(final EndpointBean epb, final OperationMetaData opMetaData, Connection connection, final String mTableName, String lSelectSQL) throws MessagingException { try { - - String jndiName = eBean.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); + String jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME); mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); - - if ((mMarkColumnName == null) || (mMarkColumnName.equals(""))) { - // do nothing - } else { - if(mFlagColumnType != null){ - String whereClause = " where "; - if((lSelectSQL.toUpperCase().contains(whereClause.toUpperCase()))) { - if (mFlagColumnType.equalsIgnoreCase("LONGVARCHAR") || mFlagColumnType.equalsIgnoreCase("CHAR") - || mFlagColumnType.equalsIgnoreCase("VARCHAR")) { - lSelectSQL = lSelectSQL.concat(" and (" + mMarkColumnName + " != " + "'" - + mMarkColumnValue + "'" + " or " + mMarkColumnName + " is NULL )"); - } else { - lSelectSQL = lSelectSQL.concat(" and (" + mMarkColumnName + " != " - + mMarkColumnValue + " or " + mMarkColumnName + " is NULL )"); - } - }else { - if (mFlagColumnType.equalsIgnoreCase("LONGVARCHAR") || mFlagColumnType.equalsIgnoreCase("CHAR") - || mFlagColumnType.equalsIgnoreCase("VARCHAR")) { - lSelectSQL = lSelectSQL.concat(" where (" + mMarkColumnName + " != " + "'" - + mMarkColumnValue + "'" + " or " + mMarkColumnName + " is NULL )"); - } else { - lSelectSQL = lSelectSQL.concat(" where (" + mMarkColumnName + " != " - + mMarkColumnValue + " or " + mMarkColumnName + " is NULL )"); - } - } - } else{ - final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00638.IMP_Error_IVALID_ColumnName") + mMarkColumnName; + String where = ""; + List bind = new ArrayList(); + if (mMarkColumnName != null && !mMarkColumnName.equals("")) { + if (mFlagColumnType != null) { + where = "("+mMarkColumnName+" != ? OR "+mMarkColumnName+" IS NULL)"; + bind.add(mMarkColumnValue); + } else { + final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00638.IMP_Error_IVALID_ColumnName") + mMarkColumnName; throw new MessagingException(msg, new NamingException()); + } } - } + if (epb.isClustered()) { + List pkList = mJDBCClusterManager.selectAllProcessed(); + if (pkList.size() > 0) { + StringBuilder sb = new StringBuilder(); + for (int i = 0, l = pkList.size(); i < l; i++) + sb.append(i < l-1 ? "?," : "?"); + where = (where.equals("") ? "" : where+" AND ")+mPKName+" NOT IN ("+sb.toString()+")"; + bind.addAll(pkList); + } + } + lSelectSQL = lSelectSQL.replace("$WHERE", where.equals("") ? "1=1" : where); mLogger.log(Level.INFO, "Executing sql 1. " + lSelectSQL); ps = connection.prepareStatement(lSelectSQL); + ParameterMetaData paramMetaData = ps.getParameterMetaData(); + for (int i = 0, l = bind.size(); i < l; i++) + { + int columnType = java.sql.Types.VARCHAR; + try { columnType = paramMetaData.getParameterType(i+1); } catch(Exception e) {} + ps.setObject(i+1, JDBCUtil.convert(bind.get(i), columnType), columnType); + } rs = ps.executeQuery(); - } + } catch (final SQLException ex) { - if (mLogger.isLoggable(Level.FINEST)) { mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL, ex); } else if (mLogger.isLoggable(Level.FINE)) { @@ -662,7 +647,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL + ex.getLocalizedMessage(); throw new MessagingException(msg, ex); - } + } return rs; } @@ -684,14 +669,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi prdtName = DBMetaData.getDBType(connection); rs = connection.getMetaData().getColumns(catalog, mSchemaName, mTableName, "%"); - + int noofColCounter = -1; - // if(rs==null){ - // final String msg = InboundMessageProcessor.mMessages.getString("IMP_Table_NotExist"); - // throw new MessagingException(msg, new NamingException()); - // } while (rs.next()) { - noofColCounter++; + noofColCounter++; final String colName = rs.getString("COLUMN_NAME"); if (colName.equalsIgnoreCase(meta.getJDBCSql().getPKName())) { final String defaultValue = rs.getString("COLUMN_DEF"); @@ -707,11 +688,11 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } } if(noofColCounter < 0 ){ - final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00636.IMP_Table_NotExist"); + final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00636.IMP_Table_NotExist"); throw new MessagingException(msg, new NamingException()); } if (mPKType == null) { - final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00637.IMP_PrimaryKey_Error"); + final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00637.IMP_PrimaryKey_Error"); throw new MessagingException(msg, new NamingException()); } if (prdtName.equalsIgnoreCase(InboundMessageProcessor.DERBY_PROD_NAME)) { @@ -855,7 +836,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mLogger.info(mMessages.getString("IMP_POST_PROCESS_FAILED")); } mLogger.log(Level.SEVERE, "IMP_POST_PROCESS_FAILED", - new Object[] { ex.getLocalizedMessage() }); + new Object[] { ex.getLocalizedMessage() }); } } connection = getDatabaseConnection(jndiName); @@ -1009,30 +990,29 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception); } } - } // for cluster environment if(epb.isClustered()){ + Connection con = null; try{ - Connection con = null; List records = (List)mMapInboundExchangesProcessRecords.get(messageId); if(jndiName.equalsIgnoreCase(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME))){ con = connection; }else{ - con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); + con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); } - mJDBCClusterManager.updateStatusToDone(records, "DONE", con); + mJDBCClusterManager.setDataBaseConnection(con); + mJDBCClusterManager.deleteInstances(records); mLogger.log(Level.INFO, - "DBBC_R10906.IMP_UPDATED_STATUS_TO_DONE", - new Object[] { records }); + "DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE", + new Object[] { records }); }catch(Exception e){ - mLogger.log(Level.SEVERE, "Unable to set the status to DOne", e.getLocalizedMessage()); + mLogger.log(Level.SEVERE, "Unable to delete processed records", e.getLocalizedMessage()); }finally { try{ - if(con != null){ + if(con != null && con != connection){ con.close(); - con = null; - } + } }catch(SQLException se){ mLogger.log(Level.SEVERE, "Unable to close the connection", se.getLocalizedMessage()); } @@ -1044,8 +1024,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi String pkNameRet = (String) it.next(); mProcessedList.remove(pkNameRet); } - mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), - messageId }); + mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId }); if (isTransacted && exchange instanceof InOnly) { try { // As we are the initiator for tx we have to rollback @@ -1079,9 +1058,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } try{ if(connection != null) { - connection.close(); + connection.close(); } - }catch(SQLException se){ mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); } @@ -1098,7 +1076,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi if (mLogger.isLoggable(Level.FINER)) { mLogger.log(Level.FINER, " resuing txn ", new Object[] { tx.toString() }); } - } } @@ -1186,10 +1163,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } private TransactionManager getTransactionManager() { - return (TransactionManager)mContext.getTransactionManager(); + return (TransactionManager)mContext.getTransactionManager(); } - public void setMessageExchangeId(String messageExchangeId, Object retryMetaData) { + public void setMessageExchangeId(String messageExchangeId, Object retryMetaData) { exchangeIDToMeta.put(messageExchangeId, retryMetaData); } @@ -1201,7 +1178,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi // remove the listener associated with the exchange ID MessageExchangeSupport.removeRedeliveryListener(exchange.getExchangeId()); - mInboundExchanges.remove(exchange.getExchangeId()); + mInboundExchanges.remove(exchange.getExchangeId()); try{ switch (ExchangePattern.valueOf(exchange)) { case IN_OUT: @@ -1230,14 +1207,14 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi } else if (mLogger.isLoggable(Level.INFO)) { mLogger.log(Level.INFO, "Resending the InOnly exchange"); } - + inMsg = ((InOnly)exchange).getInMessage(); InOnly inonly = mMsgExchangeFactory.createInOnlyExchange(); // make sure that the message id has is the same - inonly.setProperty(CRMP_GROUP_ID, groupId); - inonly.setProperty(CRMP_MESSAGE_ID, messageId); - //processInOnly(inonly, inMsg, operationMetaData); - + inonly.setProperty(CRMP_GROUP_ID, groupId); + inonly.setProperty(CRMP_MESSAGE_ID, messageId); + //processInOnly(inonly, inMsg, operationMetaData); + if (mServiceEndpoint == null) { mServiceEndpoint = locateServiceEndpoint(); epb.setValueObj(EndpointBean.ENDPOINT_REFERENCE, mServiceEndpoint); @@ -1256,9 +1233,9 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi String pkNameRet = (String) it.next(); mProcessedList.remove(pkNameRet); } - - // Removing the records from the Map - mMapInboundExchangesProcessRecords.remove(exchange.getExchangeId()); + + // Removing the records from the Map + mMapInboundExchangesProcessRecords.remove(exchange.getExchangeId()); processInOnly(inonly,operationMetaData); break; default: @@ -1274,9 +1251,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi throw new MessagingException(e); } } - /* - * Runtime Config object to cluster JNDI name - * + + /* Runtime Config object to cluster JNDI name */ public void setRuntimeConfig(RuntimeConfiguration runtimeConfg){ mRuntimeConfig = runtimeConfg; diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java index ea4381eaf..d6bf56518 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java @@ -81,6 +81,8 @@ public class JDBCClusterManager { private String BASE_INSTANCESTATE_UPDATE_STMT_STR; + private String BASE_OWNERTABLE_SELECTALL_STMT_STR; + private String BASE_OWNERTABLE_INSERT_STMT_STR; private String BASE_OWNERTABLE_UPDATE_STMT_STR; @@ -464,20 +466,18 @@ public class JDBCClusterManager { } /* - * Check if the record is already processed by another instance. @return boolean recordInserted - * if not inserted, insert the record with current instance name and status to "In Progress" + * Insert new rows with the "In Progress" state */ - public boolean isRecordInsertedByCurrentInstance() { - boolean recordInserted = false; - String insertQuery = BASE_OWNERTABLE_INSERT_STMT_STR; + public int[] addInstances(List pkList) throws Exception { PreparedStatement ps = null; ParameterMetaData paramMetaData = null; int parameters = 0; Connection con = null; + int[] executedRows = null; try { con = getDataBaseConnection(); - ps = con.prepareStatement(insertQuery); - paramMetaData = ps.getParameterMetaData(); + ps = con.prepareStatement(BASE_OWNERTABLE_INSERT_STMT_STR); + paramMetaData = ps.getParameterMetaData(); if (paramMetaData != null) { parameters = paramMetaData.getParameterCount(); } @@ -486,45 +486,56 @@ public class JDBCClusterManager { } catch (Exception ex) { mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); } - if (parameters != 0) { - if ((getPKValue() != null) && !getPKValue().trim().equals("")) { - // set default type. - int columnType = java.sql.Types.VARCHAR; - try { - try{ - columnType = paramMetaData.getParameterType(1); - }catch(Exception e){ - } - ps.setObject(1, JDBCUtil.convert(getPKValue(), columnType), columnType); - try{ - columnType = paramMetaData.getParameterType(2); - }catch(Exception e){ - } - ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType); - try{ - columnType = paramMetaData.getParameterType(3); - }catch(Exception e){ - } - ps.setObject(3, JDBCUtil.convert("In Progress", columnType), columnType); - int rowsUpdated = ps.executeUpdate(); - recordInserted = true; - } catch (final Exception e) { - mLogger.log(Level.WARNING, e.getLocalizedMessage()); - mLogger.log(Level.INFO, mMessages.getString("DBBC-R10903.JCM_RECORD_LOCKED", new Object[]{getInstanceName()})); - recordInserted = false; - }finally{ - if(ps != null){ - try{ - ps.close(); - } catch(SQLException e){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - e); - } - } - } + try { + if (parameters != 0) { + addBatch(pkList, paramMetaData, ps, "In Progress", true); + } + executedRows = ps.executeBatch(); + for (int i = 0; i < executedRows.length; i++) { + if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) { + throw new SQLException( + "One of the Queries in the batch didn't update any rows, Should have updated atleast one row"); + } + mLogger.log(Level.INFO, "Inserted In Progress OWNER record "+pkList.get(i).toString()); + } + } catch(Exception e) { + if(ps != null){ + try{ + ps.clearBatch(); + ps.close(); + } catch(SQLException se){ + mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), se); } } - return recordInserted; + throw e; + } + return executedRows; + } + + public void deleteInstances(List pkList) throws Exception { + if (pkList.size() <= 0) + return; + PreparedStatement ps = null; + String sql = BASE_OWNERTABLE_DELETE_STMT_STR+" ("; + for (int i = 0, l = pkList.size(); i < l; i++) + sql += (i < l-1 ? "?," : "?)"); + ParameterMetaData paramMetaData = null; + int parameters = 0; + Connection con = null; + int[] executedRows = null; + try { + con = getDataBaseConnection(); + ps = con.prepareStatement(sql); + ps.setString(1, getInstanceName()); + for (int i = 0, l = pkList.size(); i < l; i++) + ps.setString(i+2, (String)pkList.get(i)); + if (mLogger.isLoggable(Level.FINE)) + mLogger.log(Level.FINE, "Executing SQL: "+sql); //$NON-NLS-1$ + ps.execute(); + } catch (Exception e) { + mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), e); + throw e; + } } /* @@ -552,9 +563,9 @@ public class JDBCClusterManager { } catch (Exception ex) { mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); } - try{ + try { if (parameters != 0) { - addBatch(pkList, paramMetaData, ps, status); + addBatch(pkList, paramMetaData, ps, status, false); } executedRows = ps.executeBatch(); for (int i = 0; i < executedRows.length; i++) { @@ -564,18 +575,7 @@ public class JDBCClusterManager { } ; } - }catch(SQLException e){ - if(ps != null){ - try{ - ps.clearBatch(); - ps.close(); - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - e); - } - } - throw e; - }finally { + } catch(Exception e) { if(ps != null){ try{ ps.clearBatch(); @@ -585,12 +585,13 @@ public class JDBCClusterManager { se); } } + throw e; } return executedRows; } - private void addBatch(List pkList, ParameterMetaData parameterMeta, PreparedStatement ps, String status) throws SQLException, Exception { + private void addBatch(List pkList, ParameterMetaData parameterMeta, PreparedStatement ps, String status, boolean forIns) throws Exception { if (!pkList.isEmpty()) { for (final Iterator it = pkList.iterator(); it.hasNext();) { String pkValue = (String) it.next(); @@ -598,19 +599,18 @@ public class JDBCClusterManager { // set default type. int columnType = java.sql.Types.VARCHAR; try { - try{ - columnType = parameterMeta.getParameterType(1); - }catch(Exception e){ - } - ps.setObject(1, JDBCUtil.convert(status, columnType), columnType); - try{ - columnType = parameterMeta.getParameterType(2); - }catch(Exception e){ + if (forIns) + { + try { columnType = parameterMeta.getParameterType(2); } catch(Exception e) {} + ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType); } - ps.setObject(2, JDBCUtil.convert(pkValue, columnType), columnType); + try { columnType = parameterMeta.getParameterType(forIns ? 3 : 1); } catch(Exception e) {} + ps.setObject(forIns ? 3 : 1, JDBCUtil.convert(status, columnType), columnType); + try { columnType = parameterMeta.getParameterType(forIns ? 1 : 2); } catch(Exception e) {} + ps.setObject(forIns ? 1 : 2, JDBCUtil.convert(pkValue, columnType), columnType); ps.addBatch(); - } catch (SQLException e) { - mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS"), e); + } catch (Exception e) { + mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11005.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS"), e); if(ps != null){ try{ ps.clearBatch(); @@ -621,68 +621,22 @@ public class JDBCClusterManager { } } throw e; - }catch(Exception ex){ - throw ex; } } } } } - - public int[] updateStatusToDone(List pkList, String status, Connection conn) throws Exception { - String updateQuery = BASE_OWNERTABLE_UPDATE_STMT_STR; - PreparedStatement ps = null; - ParameterMetaData paramMetaData = null; - int parameters = 0; - Connection con = conn; - int[] executedRows = null; - try { - ps = con.prepareStatement(updateQuery); - paramMetaData = ps.getParameterMetaData(); - if (paramMetaData != null) { - parameters = paramMetaData.getParameterCount(); - } - } catch (SQLException ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } catch (Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } - try{ - if (parameters != 0) { - addBatch(pkList, paramMetaData, ps, status); - } - executedRows = ps.executeBatch(); - for (int i = 0; i < executedRows.length; i++) { - if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) { - throw new SQLException( - "One of the Queries in the batch didn't update any rows, Should have updated atleast one row"); - } - ; - } - }catch(SQLException e){ - if(ps != null){ - try{ - ps.clearBatch(); - ps.close(); - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - e); - } - } - throw e; - }finally { - if(ps != null){ - try{ - ps.clearBatch(); - ps.close(); - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - se); - } - } - } - - return executedRows; + + public List selectAllProcessed() throws Exception + { + Connection con = getDataBaseConnection(); + List pkList = new ArrayList(); + Statement st = con.createStatement(); + st.execute(BASE_OWNERTABLE_SELECTALL_STMT_STR); + ResultSet rs = st.getResultSet(); + while (rs.next()) + pkList.add(rs.getString(1)); + return pkList; } /* @@ -698,13 +652,17 @@ public class JDBCClusterManager { BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$ " SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$ } + if(BASE_OWNERTABLE_SELECTALL_STMT_STR == null){ + BASE_OWNERTABLE_SELECTALL_STMT_STR = "SELECT "+getPKName()+" FROM OWNER_"+getTableName()+ + " FOR UPDATE"; + } if(BASE_OWNERTABLE_INSERT_STMT_STR == null){ BASE_OWNERTABLE_INSERT_STMT_STR = "INSERT INTO OWNER_" + getTableName() + //$NON-NLS-1$ " VALUES(?, ?, ?)"; //$NON-NLS-1$ } if(BASE_OWNERTABLE_DELETE_STMT_STR == null){ BASE_OWNERTABLE_DELETE_STMT_STR = "DELETE FROM OWNER_" + getTableName() + //$NON-NLS-1$ - "WHERE INSTANCEID = ? and status= ?"; //$NON-NLS-1$ + " WHERE instance_name=? AND "+getPKName()+" IN "; //$NON-NLS-1$ } if(BASE_OWNERTABLE_UPDATE_STMT_STR == null){ BASE_OWNERTABLE_UPDATE_STMT_STR = "UPDATE OWNER_" + getTableName() + //$NON-NLS-1$ diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCNormalizer.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCNormalizer.java index c7925ef23..50131912c 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCNormalizer.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCNormalizer.java @@ -96,7 +96,6 @@ public class JDBCNormalizer { public ArrayList mProcessedList = new ArrayList(); public Map mInboundExchangeProcessRecordsMap = new HashMap(); public int mRowCount = 0; - private JDBCClusterManager mJDBCClusterManager; /** Creates a new instance of SoapNormalizer * @throws javax.jbi.messaging.MessagingException @@ -372,29 +371,16 @@ public class JDBCNormalizer { pkName) || ("\"" + colName + "\""). equalsIgnoreCase( pkName)) - if (epb.isClustered()) { - boolean inserted = - false; - mJDBCClusterManager.setPKValue( + { + boolean processed = + isRecordProcessed( + colValue); + if (!processed) + pKeyList.add( colValue); - inserted = - mJDBCClusterManager. - isRecordInsertedByCurrentInstance(); - if (!inserted) - record = null; - else - pKeyList.add( - colValue); - } else { - boolean processed = - isRecordProcessed( - colValue); - if (!processed) - pKeyList.add( - colValue); - else - record = null; - } + else + record = null; + } if (record != null) { final Element e = NS != null ? normalDoc.createElementNS( @@ -1274,10 +1260,6 @@ public class JDBCNormalizer { this.mProcessedList = list; } - public void setJDBCClusterManager(JDBCClusterManager jdbcClusterManager) { - this.mJDBCClusterManager = jdbcClusterManager; - } - private boolean isRecordProcessed(String colValue) { boolean recordProcessed = true; if (mProcessedList.isEmpty() || !mProcessedList.contains(colValue)) { diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties index a37813764..13ad4da86 100755 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties @@ -113,12 +113,12 @@ DBBC_R00666.IMP_THROTTLE_NOT_DEFINED=Throttling configuration is not defined on DBBC_W00667.IMP_EP_NOT_RUNNING=EndPoint \[{0}\] is not in state RUNNING. Ignoring received message. DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED=The number of messages exceed the throttle limit {0} DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED=The number of messages are within the throttle limit {0} -DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT=DBBC_R10906.Update the status to SENT, records sent to BPEL {0}. -DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT=DBBC_E11108.Unable to update the status to SENT {0}, exception is {1} -DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE=DBBC_R10907.Updated the status to DONE, records processed {0} -DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS=DBBC_E11109.Unable to close the result set, exception is {0} -DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS=DBBC_E11110.Unable to close the statement, exception is {0} -DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION=DBBC_E11111.Unable to close the connection, exception is {0} +DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT=Update the status to SENT, records sent to BPEL {0}. +DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT=Unable to update the status to SENT {0}, exception is {1} +DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE=Finished processing records {0}, OWNER_ rows deleted. +DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS=Unable to close the result set, exception is {0} +DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS=Unable to close the statement, exception is {0} +DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION=Unable to close the connection, exception is {0} ############################ resource bundles for OutboundMessageProcessor ################ DBBC_R00606.OMP_Accept_msg=Accepted message with exchange ID {0} in DBBC outbound message processor.