From 6a6f2a2b47d5db962f91cc1264d313560bde9faf Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Fri, 18 Dec 2015 00:33:56 +0300 Subject: [PATCH] Remove cluster manager --- .../databasebc/InboundMessageProcessor.java | 112 --- .../openesb/databasebc/InboundReceiver.java | 17 - .../databasebc/JDBCClusterManager.java | 718 ------------------ .../databasebc/JDBCHeartbeatManager.java | 236 ------ .../databasebc/messages/Bundle.properties | 18 - 5 files changed, 1101 deletions(-) delete mode 100644 ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java delete mode 100644 ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCHeartbeatManager.java diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java index 6da5f25b5..b79c9fbd4 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java @@ -150,8 +150,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi private int mRowCount = 0; - Connection mClusterConnection = null; - private String mTableName = null; private String mDbName = null; @@ -162,7 +160,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi private int mPollMilliSeconds = 10000; private int mThrottleNumber = -1; - private JDBCClusterManager mJDBCClusterManager = null; // Settings for custom reliability header extensions public static final String CUSTOM_RELIABILITY_MESSAGE_ID_PROPERTY = "com.stc.jbi.messaging.messageid"; // NOI18N @@ -189,14 +186,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mOperation = opname; mMonitor = new AtomicBoolean(false); final DocumentBuilderFactory docBuilderFact = DocumentBuilderFactory.newInstance(); - if(endpoint.isClustered()){ - try{ - mJDBCClusterManager = new JDBCClusterManager(context); - }catch(Exception e){ - //TODO - } - } - } /** @@ -376,39 +365,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi if (mDbName==null){ mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase(); } - String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME); - if(epb.isClustered()){ - try{ - if(jndiName.equalsIgnoreCase(clusterJNDIName)){ - mClusterConnection = connection; - mJDBCClusterManager.setJNDIName(clusterJNDIName); - String prdtName = DBMetaData.getDBType(mClusterConnection); - mJDBCClusterManager.setProductName(prdtName); - }else{ - mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); - mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); - String prdtName = DBMetaData.getDBType(mClusterConnection); - mJDBCClusterManager.setProductName(prdtName); - } - }catch(Exception e){ - if(mClusterConnection == null){ - //TODO retry; - throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION", - new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} )); - } - } - } if (isSelectStatement(mSelectSQL)) { - if(epb.isClustered()){ - mJDBCClusterManager.setDataBaseConnection(mClusterConnection); - mJDBCClusterManager.setTableName(mTableName); - mJDBCClusterManager.setInstanceName(epb.getInstanceName()); - mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds); - mJDBCClusterManager.setPKName(mPKName); - mJDBCClusterManager.doClusterTasks(); - mClusterConnection.setAutoCommit(false); - } - rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL); if (rs != null) { @@ -428,11 +385,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi final List tempList = epb.getProcessList(); if (!(tempList.isEmpty())) { - if (epb.isClustered()) - { - mJDBCClusterManager.addInstances(tempList); - mClusterConnection.setAutoCommit(true); - } //set JNDI name on NormalizedMessage for dynamic addressing inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName); exchange.setMessage(inMsg, "in"); @@ -442,27 +394,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mChannel.send(exchange); epb.getEndpointStatus().incrementSentRequests(); - if(epb.isClustered()){ - //Records already sent to NMR so update the status to "SENT" for owner table - try{ - int i[] = mJDBCClusterManager.updateStatus(tempList, "SENT"); - mLogger.log(Level.INFO, - "DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT", - new Object[] { tempList }); - }catch(Exception e){ - // TODO need to handled the exception - mLogger.log(Level.SEVERE, - "DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT", - new Object[] { tempList, e.getLocalizedMessage() }); - } - } - } else { - if (epb.isClustered()) - mClusterConnection.setAutoCommit(true); } } - else if (epb.isClustered()) - mClusterConnection.setAutoCommit(true); } } } catch (final Exception ex) { @@ -489,14 +422,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi }catch(SQLException se){ mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); } - try{ - if (mClusterConnection != null && mClusterConnection != connection){ - mClusterConnection.close(); - mClusterConnection = null; - } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se); - } } } @@ -556,16 +481,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi throw new MessagingException(msg, new NamingException()); } } - if (epb.isClustered()) { - List pkList = mJDBCClusterManager.selectAllProcessed(); - if (pkList.size() > 0) { - StringBuilder sb = new StringBuilder(); - for (int i = 0, l = pkList.size(); i < l; i++) - sb.append(i < l-1 ? "?," : "?"); - where = (where.equals("") ? "" : where+" AND ")+mPKName+" NOT IN ("+sb.toString()+")"; - bind.addAll(pkList); - } - } lSelectSQL = lSelectSQL.replace("$WHERE", where.equals("") ? "1=1" : where); mLogger.log(Level.INFO, "Executing sql 1. " + lSelectSQL); PreparedStatement ps = connection.prepareStatement(lSelectSQL); @@ -800,33 +715,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName); connection = getDatabaseConnection(jndiName); doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId)); - // for cluster environment - if(epb.isClustered()){ - Connection con = null; - try{ - List records = (List)mMapInboundExchangesProcessRecords.get(messageId); - if(jndiName.equalsIgnoreCase(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME))){ - con = connection; - }else{ - con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); - } - mJDBCClusterManager.setDataBaseConnection(con); - mJDBCClusterManager.deleteInstances(records); - mLogger.log(Level.INFO, - "DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE", - new Object[] { records }); - }catch(Exception e){ - mLogger.log(Level.SEVERE, "Unable to delete processed records", e.getLocalizedMessage()); - }finally { - try{ - if(con != null && con != connection){ - con.close(); - } - }catch(SQLException se){ - mLogger.log(Level.SEVERE, "Unable to close the connection", se.getLocalizedMessage()); - } - } - } } else { mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId }); // Any status other than 'DONE' is considered an error diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundReceiver.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundReceiver.java index 187cb7971..94c5a4f5f 100644 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundReceiver.java +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundReceiver.java @@ -86,7 +86,6 @@ class InboundReceiver { // This is removed since never used //private final Map mEndpoints; private final Map mActivatedInboundMsgProcs; - private final Map mActivatedJDBCHeartbeatManager; /** * @@ -135,7 +134,6 @@ class InboundReceiver { new LinkedBlockingQueue(), Executors.defaultThreadFactory()); mInboundPooledExecutor.prestartAllCoreThreads(); mActivatedInboundMsgProcs = Collections.synchronizedMap(new HashMap()); - mActivatedJDBCHeartbeatManager = Collections.synchronizedMap(new HashMap()); //mEndpoints = endpoints; } @@ -190,15 +188,6 @@ class InboundReceiver { if (!mActivatedInboundMsgProcs.containsKey(key)) { try { - if(endpoint.isClustered()){ - final JDBCHeartbeatManager proc = new JDBCHeartbeatManager( - endpoint, mContext, opname); - proc.setRuntimeConfig(mRuntimeConfig); - final Thread task = new Thread(proc); - task.start(); - // store the thread in map - mActivatedJDBCHeartbeatManager.put(key, proc); - } final InboundMessageProcessor proc = new InboundMessageProcessor(mChannel, endpoint, mContext, opname); proc.setRuntimeConfig(mRuntimeConfig); @@ -252,12 +241,6 @@ class InboundReceiver { // Remove the thread from the map mActivatedInboundMsgProcs.remove(key); } // if - if (mActivatedJDBCHeartbeatManager.containsKey(key) && endpoint.isClustered()) { - final JDBCHeartbeatManager proc = mActivatedJDBCHeartbeatManager.get(key); - // Stop the Heart beat message processor thread - proc.stopReceiving(); - mActivatedJDBCHeartbeatManager.remove(key); - } // if } // for if (mLogger.isLoggable(Level.FINEST)) { diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java deleted file mode 100644 index d6bf56518..000000000 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCClusterManager.java +++ /dev/null @@ -1,718 +0,0 @@ -/* - * BEGIN_HEADER - DO NOT EDIT - * - * The contents of this file are subject to the terms - * of the Common Development and Distribution License - * (the "License"). You may not use this file except - * in compliance with the License. - * - * You can obtain a copy of the license at - * https://open-jbi-components.dev.java.net/public/CDDLv1.0.html. - * See the License for the specific language governing - * permissions and limitations under the License. - * - * When distributing Covered Code, include this CDDL - * HEADER in each file and include the License file at - * https://open-jbi-components.dev.java.net/public/CDDLv1.0.html. - * If applicable add the following below this CDDL HEADER, - * with the fields enclosed by brackets "[]" replaced with - * your own identifying information: Portions Copyright - * [year] [name of copyright owner] - */ - -/* - * @(#)JDBCClusterManager.java - * - * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved. - * - * END_HEADER - DO NOT EDIT - */ - -package org.glassfish.openesb.databasebc; - -import java.sql.Connection; -import java.sql.ParameterMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Statement; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.sql.DataSource; -import javax.jbi.component.ComponentContext; -import com.sun.jbi.internationalization.Messages; - -public class JDBCClusterManager { - - private static final Messages mMessages = Messages.getMessages(JDBCClusterManager.class); - - private static final Logger mLogger = Messages.getLogger(JDBCClusterManager.class); - - private String mTableName; - - private ComponentContext mContext; - private String mprodName; - - private String mPKName; - - private String mInstanceName; - - private List mPKList = new ArrayList(); - - private String mPKValue; - - private int mPollingInterval; - - private long mHeartbeatUpdateConfigInterval; - - private Connection connection = null; - - private String mJNDIName = null; - - private String BASE_INSTANCESTATE_INSERT_STMT_STR; - - private String BASE_INSTANCESTATE_UPDATE_STMT_STR; - - private String BASE_OWNERTABLE_SELECTALL_STMT_STR; - - private String BASE_OWNERTABLE_INSERT_STMT_STR; - - private String BASE_OWNERTABLE_UPDATE_STMT_STR; - - private String BASE_OWNERTABLE_DELETE_STMT_STR; - - private String BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR; - - private String BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR; - - private String BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR; - private String BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR; - - private Object objectLock = new Object(); - - private Object mUpdateHeartbeatLock = new Object(); - - private Object mRecoverDanglingInstanceLock = new Object(); - - private boolean mRecoverDanglingInstInProgress = false; - - /* - * Constructor - */ - public JDBCClusterManager(ComponentContext context) throws Exception { - mContext = context; - } - - /* - * polling table name @param val string - */ - public void setTableName(String val) { - this.mTableName = val; - } - - /* - * get the table name @return mTableName - */ - public String getTableName() { - return this.mTableName; - } - - /* - * primary key name of the polling table @return String mPKName - */ - public String getPKName() { - return mPKName; - } - - /* - * set the primary key name of the polling table @param string val - */ - - public void setPKName(String val) { - mPKName = val; - } - - /* - * set the primary key value @param string val - */ - - public void setPKValue(String val) { - mPKValue = val; - } - - /* - * get the primary key value @return string mPKValue - */ - - public String getPKValue() { - return mPKValue; - } - - /* - * set heartbeat Config Interval, i.e 200% of the polling interval @param int val - */ - - public void setHeartbeatConfigInterval(int val) { - mHeartbeatUpdateConfigInterval = val * 200 / 100; - } - - /* - * get heartbeat Config Interval, i.e 200% of the polling interval @return long - * mHeartbeatUpdateConfigInterval - */ - - public long getHeartbeatConfigInterval() { - return mHeartbeatUpdateConfigInterval; - } - - /* - * set instance Name @param string val - */ - - public void setInstanceName(String val) { - mInstanceName = val; - } - - /* - * get the instance Name @return string mInstance Name - */ - - public String getInstanceName() { - return mInstanceName; - } - - public void setProductName(String prodName){ - mprodName = prodName; - } - - /* - * do the clustering task, 1. to update the heartbeat, 2 update the dangling instanes with the - * current instance for ownership, 3 select dangling instances PK list(this list contains more - * pk values, while constructing the normalize message in JDBCNormalizer class, based on the - * numberOfRecords config, process only that many records per poll. If this list contains more - * than that configured number then process those records in the next poll.) - */ - - public void doClusterTasks() { - constructSQLQueries(); - // This not requried here againg to set timestamp. - // boolean heartBeatUpdated = upDateHeartBeat(); - // if(heartBeatUpdated){ - doInstanceFailover(); - // } - } - - - /** - * Check for the failed instance and acquire the dangling instances. - */ - - private void doInstanceFailover() { - synchronized (mRecoverDanglingInstanceLock) { - if (mRecoverDanglingInstInProgress) { - return; - } - mRecoverDanglingInstInProgress = true; - } - /* boolean danglingInstancesUpdated = updateDanglingInstances(); - if(danglingInstancesUpdated){ - this.mPKList = selectDanglingInstancesPKList(); - };*/ - boolean deleted = deleteDanglingInstances(); - mRecoverDanglingInstInProgress = false; - } - /* - * get the under line database connection @return connection - */ - - public Connection getDataBaseConnection() throws Exception { - if(this.connection != null){ - return this.connection; - } - try{ - this.connection = getDatabaseConnection(mJNDIName); - this.connection.setAutoCommit(true); - }catch (Exception e){ - throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION", - new Object[] {this.mJNDIName} )); - } - return this.connection; - } - - - /* - * set the under line database connection @return connection - */ - - public void setDataBaseConnection(Connection con) throws Exception { - if(con != null) - this.connection = con; - } - - /* - * used to the Datasource @param String val - */ - - public void setJNDIName(String val){ - this.mJNDIName = val; - } - - /* - * used to update the instnace heatbeat per each poll to know instance is alive or not - */ - - /* - * private boolean upDateHeartBeat() { boolean retry; boolean updated; do { retry = false; - * updated = false; PreparedStatement ps = null; Connection con = null; try { con = - * getDataBaseConnection(); ps = con.prepareStatement(BASE_INSTANCESTATE_UPDATE_STMT_STR); - * ps.setString(1, getInstanceName()); int updateCount = ps.executeUpdate(); if(updateCount == - * 0){ // this indicates that no entry exists for this instance id, // insert new one. ps = - * con.prepareStatement(BASE_INSTANCESTATE_INSERT_STMT_STR); ps.setString(1, getInstanceName()); - * int inserted = ps.executeUpdate(); }else if(updateCount == 1){ System.out.println("updated - * the time stamp for the instnce "+getInstanceName()); mLogger.log(Level.INFO, - * mMessages.getString("DBBC_R10901.JCM_UPDATED_TIME_STAMP", new Object[] {getInstanceName()})); } - * updated = true; }catch(Exception e){ if(con != null){ try{ con.rollback(); }catch(Exception - * ex){ System.out.println("Exception while roll back the transaction "+ex); - * mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11001.JCM_EXCEPTION_WHILE_ROLLBACK"), - * ex); } }else{ // TODO retry for the connection } } finally{ if(ps != null){ try{ ps.close(); } - * catch(SQLException e){ mLogger.log(Level.SEVERE, - * mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), e); } } } } while (retry); - * return updated; } - */ - - /* - * get the dangling instances PK list. - */ - - public List getDanglingInstancePKList() { - return mPKList; - } - - /* - * Query for the dangling instances PK list for instance to process with current instance. - */ - - private List selectDanglingInstancesPKList(){ - String selectQuery = BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR; - PreparedStatement ps = null; - ResultSet rs = null; - Connection con = null; - try { - con = getDataBaseConnection(); - ps = con.prepareStatement(selectQuery); - ps.setString(1, getInstanceName()); - rs = ps.executeQuery(); - while(rs.next()){ - String pkValue = rs.getString(getPKName()); - mPKList.add(pkValue); - } - } catch (SQLException ex) { - mLogger.log(Level.SEVERE, ex.getLocalizedMessage()); - }catch (Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } finally{ - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11003.JCM_EXCEPTION_WHILE_CLOSING_RS"), e); - } - } - if (ps != null) { - try { - ps.close(); - } catch (SQLException e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), e); - } - } - - } - return mPKList; - } - - /* - * get the instance ownership transfer. @ return boolean update if successed - */ - private boolean updateDanglingInstances() { - boolean updated = false; - String updateQuery = BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR; - PreparedStatement ps = null; - ParameterMetaData paramMetaData = null; - int parameters = 0; - Connection con = null; - try { - con = getDataBaseConnection(); - ps = con.prepareStatement(updateQuery); - paramMetaData = ps.getParameterMetaData(); - if (paramMetaData != null) { - parameters = paramMetaData.getParameterCount(); - } - } catch (SQLException ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } catch (Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } - if (parameters != 0) { - // set default type. - int columnType = java.sql.Types.VARCHAR; - try { - columnType = paramMetaData.getParameterType(1); - ps.setObject(1, JDBCUtil.convert(getInstanceName(), columnType), columnType); - ps.setLong(2, getHeartbeatConfigInterval()); - columnType = paramMetaData.getParameterType(3); - ps.setObject(3, JDBCUtil.convert(getInstanceName(), columnType), columnType); - columnType = paramMetaData.getParameterType(4); - ps.setObject(4, JDBCUtil.convert(getTableName(), columnType), columnType); - int rowsUpdated = ps.executeUpdate(); - if(rowsUpdated > 0 ){ - mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED", - new Object[] { getInstanceName() })); - updated = true; - } - } catch (SQLException e) { - mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED", - new Object[] { getInstanceName() })); - updated = false; - }catch (Exception e) { - mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED", - new Object[] { getInstanceName() })); - updated = false; - } finally{ - if(ps != null){ - try{ - ps.close(); - } catch(SQLException e){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - e); - } - } - } - } - return updated; - } - - - - private boolean deleteDanglingInstances() { - boolean deleted = false; - String deleteQuery = null; - if(mprodName != null && mprodName.equalsIgnoreCase("ORACLE")){ - deleteQuery = BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR; - }else{ - deleteQuery = BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR; - } - PreparedStatement ps = null; - ParameterMetaData paramMetaData = null; - int parameters = 0; - Connection con = null; - try { - con = getDataBaseConnection(); - ps = con.prepareStatement(deleteQuery); - paramMetaData = ps.getParameterMetaData(); - if (paramMetaData != null) { - parameters = paramMetaData.getParameterCount(); - } - } catch (SQLException ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } catch (Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } - if (parameters != 0) { - - // set default type. - int columnType = java.sql.Types.VARCHAR; - try { - ps.setLong(1, getHeartbeatConfigInterval()); - //columnType = paramMetaData.getParameterType(2); - //ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType); - //columnType = paramMetaData.getParameterType(3); - //ps.setObject(3, JDBCUtil.convert(getTableName(), columnType), columnType); - ps.setString(2,getInstanceName()); - ps.setString(3,getTableName()); - int rowsDeleted = ps.executeUpdate(); - if(rowsDeleted > 0 ){ - mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED", - new Object[] { getInstanceName() })); - deleted = true; - } - } catch (SQLException e) { - mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED", - new Object[] { getInstanceName() })); - deleted = false; - }catch (Exception e) { - mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED", - new Object[] { getInstanceName() })); - deleted = false; - } finally{ - if(ps != null){ - try{ - ps.close(); - } catch(SQLException e){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - e); - } - } - } - } - return deleted; - } - - /* - * Insert new rows with the "In Progress" state - */ - public int[] addInstances(List pkList) throws Exception { - PreparedStatement ps = null; - ParameterMetaData paramMetaData = null; - int parameters = 0; - Connection con = null; - int[] executedRows = null; - try { - con = getDataBaseConnection(); - ps = con.prepareStatement(BASE_OWNERTABLE_INSERT_STMT_STR); - paramMetaData = ps.getParameterMetaData(); - if (paramMetaData != null) { - parameters = paramMetaData.getParameterCount(); - } - } catch (SQLException ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } catch (Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } - try { - if (parameters != 0) { - addBatch(pkList, paramMetaData, ps, "In Progress", true); - } - executedRows = ps.executeBatch(); - for (int i = 0; i < executedRows.length; i++) { - if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) { - throw new SQLException( - "One of the Queries in the batch didn't update any rows, Should have updated atleast one row"); - } - mLogger.log(Level.INFO, "Inserted In Progress OWNER record "+pkList.get(i).toString()); - } - } catch(Exception e) { - if(ps != null){ - try{ - ps.clearBatch(); - ps.close(); - } catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), se); - } - } - throw e; - } - return executedRows; - } - - public void deleteInstances(List pkList) throws Exception { - if (pkList.size() <= 0) - return; - PreparedStatement ps = null; - String sql = BASE_OWNERTABLE_DELETE_STMT_STR+" ("; - for (int i = 0, l = pkList.size(); i < l; i++) - sql += (i < l-1 ? "?," : "?)"); - ParameterMetaData paramMetaData = null; - int parameters = 0; - Connection con = null; - int[] executedRows = null; - try { - con = getDataBaseConnection(); - ps = con.prepareStatement(sql); - ps.setString(1, getInstanceName()); - for (int i = 0, l = pkList.size(); i < l; i++) - ps.setString(i+2, (String)pkList.get(i)); - if (mLogger.isLoggable(Level.FINE)) - mLogger.log(Level.FINE, "Executing SQL: "+sql); //$NON-NLS-1$ - ps.execute(); - } catch (Exception e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), e); - throw e; - } - } - - /* - * Update the status to "SENT" in the owner table after sending the PKList to NMR. This flag is - * used to not process the died instance records which are already sent to NMR, and making the staus - * to "DONE" after getting the done from NMR - */ - - public int[] updateStatus(List pkList, String status) throws Exception { - String updateQuery = BASE_OWNERTABLE_UPDATE_STMT_STR; - PreparedStatement ps = null; - ParameterMetaData paramMetaData = null; - int parameters = 0; - Connection con = null; - int[] executedRows = null; - try { - con = getDataBaseConnection(); - ps = con.prepareStatement(updateQuery); - paramMetaData = ps.getParameterMetaData(); - if (paramMetaData != null) { - parameters = paramMetaData.getParameterCount(); - } - } catch (SQLException ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } catch (Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex); - } - try { - if (parameters != 0) { - addBatch(pkList, paramMetaData, ps, status, false); - } - executedRows = ps.executeBatch(); - for (int i = 0; i < executedRows.length; i++) { - if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) { - throw new SQLException( - "One of the Queries in the batch didn't update any rows, Should have updated atleast one row"); - } - ; - } - } catch(Exception e) { - if(ps != null){ - try{ - ps.clearBatch(); - ps.close(); - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - se); - } - } - throw e; - } - - return executedRows; - } - - private void addBatch(List pkList, ParameterMetaData parameterMeta, PreparedStatement ps, String status, boolean forIns) throws Exception { - if (!pkList.isEmpty()) { - for (final Iterator it = pkList.iterator(); it.hasNext();) { - String pkValue = (String) it.next(); - if ((pkValue != null) && !pkValue.trim().equals("")) { - // set default type. - int columnType = java.sql.Types.VARCHAR; - try { - if (forIns) - { - try { columnType = parameterMeta.getParameterType(2); } catch(Exception e) {} - ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType); - } - try { columnType = parameterMeta.getParameterType(forIns ? 3 : 1); } catch(Exception e) {} - ps.setObject(forIns ? 3 : 1, JDBCUtil.convert(status, columnType), columnType); - try { columnType = parameterMeta.getParameterType(forIns ? 1 : 2); } catch(Exception e) {} - ps.setObject(forIns ? 1 : 2, JDBCUtil.convert(pkValue, columnType), columnType); - ps.addBatch(); - } catch (Exception e) { - mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11005.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS"), e); - if(ps != null){ - try{ - ps.clearBatch(); - ps.close(); - }catch(SQLException se){ - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), - e); - } - } - throw e; - } - } - } - } - } - - public List selectAllProcessed() throws Exception - { - Connection con = getDataBaseConnection(); - List pkList = new ArrayList(); - Statement st = con.createStatement(); - st.execute(BASE_OWNERTABLE_SELECTALL_STMT_STR); - ResultSet rs = st.getResultSet(); - while (rs.next()) - pkList.add(rs.getString(1)); - return pkList; - } - - /* - * Construct SQL queries for the INSTANCESTATE Table and for OWNER_bussiner_table. to avoid - * duplication and to support failover - */ - private void constructSQLQueries(){ - if(BASE_INSTANCESTATE_INSERT_STMT_STR == null){ - BASE_INSTANCESTATE_INSERT_STMT_STR = "INSERT INTO INSTANCESTATE"+ //$NON-NLS-1$ - " VALUES(?, ?, CURRENT_TIMESTAMP)"; //$NON-NLS-1$ - } - if(BASE_INSTANCESTATE_UPDATE_STMT_STR == null){ - BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$ - " SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$ - } - if(BASE_OWNERTABLE_SELECTALL_STMT_STR == null){ - BASE_OWNERTABLE_SELECTALL_STMT_STR = "SELECT "+getPKName()+" FROM OWNER_"+getTableName()+ - " FOR UPDATE"; - } - if(BASE_OWNERTABLE_INSERT_STMT_STR == null){ - BASE_OWNERTABLE_INSERT_STMT_STR = "INSERT INTO OWNER_" + getTableName() + //$NON-NLS-1$ - " VALUES(?, ?, ?)"; //$NON-NLS-1$ - } - if(BASE_OWNERTABLE_DELETE_STMT_STR == null){ - BASE_OWNERTABLE_DELETE_STMT_STR = "DELETE FROM OWNER_" + getTableName() + //$NON-NLS-1$ - " WHERE instance_name=? AND "+getPKName()+" IN "; //$NON-NLS-1$ - } - if(BASE_OWNERTABLE_UPDATE_STMT_STR == null){ - BASE_OWNERTABLE_UPDATE_STMT_STR = "UPDATE OWNER_" + getTableName() + //$NON-NLS-1$ - " SET status = ? " + " WHERE " + getPKName() + " = ?"; //$NON-NLS-1$ - } - if(BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR == null){ - BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR = "delete from OWNER_"+getTableName()//$NON-NLS-1$ //$NON-NLS-1$ - + " where (status ='In Progress' or status ='SENT') and instance_name IN (select instanceid from INSTANCESTATE" - + " where {fn TIMESTAMPDIFF(SQL_TSI_SECOND, timestamp(lastupdatetime), CURRENT_TIMESTAMP)} > ?/1000 " - + " and instanceid != ? and tablename = ?) "; - } - if(BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR == null){ - BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR = "delete from OWNER_"+getTableName()//$NON-NLS-1$ //$NON-NLS-1$ - + " where (status ='In Progress' or status ='SENT') and instance_name IN (select instanceid from INSTANCESTATE" - + " where (trim(to_number(substr((sysdate-lastupdatetime)*24*60*60,2,(LENGTH((sysdate-lastupdatetime)*24*60*60)-19))))) > (? / 1000) " - + " and instanceid != ? and tablename = ?) "; - } - if(BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR == null){ - BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR = "update OWNER_"+getTableName()+" set instance_name = ? " //$NON-NLS-1$ //$NON-NLS-1$ - + " where (status ='In Progress') and instance_name IN (select instanceid from INSTANCESTATE" - + " where {fn TIMESTAMPDIFF(SQL_TSI_SECOND, timestamp(lastupdatetime), CURRENT_TIMESTAMP)} > ?/1000 " - + " and instanceid != ? and tablename = ?) "; - } - if(BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR == null){ - BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR = "select "+getPKName()+" from OWNER_"+getTableName()+ " where (status ='In Progress')"+ - " and instance_name = ?"; - - } - - - } - - /** - * @param jndiName - * @return - * @throws javax.naming.NamingException - */ - private Object getDataSourceFromContext(final String jndiName) throws javax.naming.NamingException { - final Context c = mContext.getNamingContext(); - - return c.lookup(jndiName); - } - - /** - * @param jndiName - * @return - * @throws Exception - */ - private Connection getDatabaseConnection(final String jndiName) throws SQLException, NamingException { - return ((DataSource) getDataSourceFromContext(jndiName)).getConnection(); - } - -} diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCHeartbeatManager.java b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCHeartbeatManager.java deleted file mode 100644 index 33c4a83dc..000000000 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/JDBCHeartbeatManager.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * BEGIN_HEADER - DO NOT EDIT - * - * The contents of this file are subject to the terms - * of the Common Development and Distribution License - * (the "License"). You may not use this file except - * in compliance with the License. - * - * You can obtain a copy of the license at - * https://open-jbi-components.dev.java.net/public/CDDLv1.0.html. - * See the License for the specific language governing - * permissions and limitations under the License. - * - * When distributing Covered Code, include this CDDL - * HEADER in each file and include the License file at - * https://open-jbi-components.dev.java.net/public/CDDLv1.0.html. - * If applicable add the following below this CDDL HEADER, - * with the fields enclosed by brackets "[]" replaced with - * your own identifying information: Portions Copyright - * [year] [name of copyright owner] - */ - -/* - * @(#)JDBCHeartbeatManager.java - * - * Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved. - * - * END_HEADER - DO NOT EDIT - */ - -package org.glassfish.openesb.databasebc; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.sun.jbi.internationalization.Messages; - -import javax.jbi.component.ComponentContext; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.sql.DataSource; -import javax.xml.namespace.QName; - -/** - * author : Venkat P Process requests received from the External Database - */ -public class JDBCHeartbeatManager implements Runnable { - - EndpointBean epb; - - private ComponentContext mContext; - - private RuntimeConfiguration mRuntimeConfig; - - private AtomicBoolean mMonitor; - - PreparedStatement ps = null; - - Connection connection = null; - - private String mTableName = null; - - private int mPollMilliSeconds = 10000; - - private QName mOperation; - - private static final Messages mMessages = Messages.getMessages(JDBCHeartbeatManager.class); - - private static final Logger mLogger = Messages.getLogger(JDBCHeartbeatManager.class); - - /* - * Constructor - */ - public JDBCHeartbeatManager(final EndpointBean endpoint, final ComponentContext context, final QName opname) - throws Exception { - epb = endpoint; - mContext = context; - mMonitor = new AtomicBoolean(false); - mOperation = opname; - } - - // @Override - public void run() { - do { - try { - execute(); - } catch (final Exception ex) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11102.JHM_ERROR_WHILE_EXECUTING_SQL"), ex); - } - try { - Thread.sleep(mPollMilliSeconds); - } catch (final Exception e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11103.JHM_THREAD_SLEEP_ABRUPTED"), e); - } - } while (mMonitor.get() != Boolean.TRUE); - } - - /** - * @throws Exception - */ - public void execute() throws Exception { - try { - Map operationNameToMetaData = (Map) epb.getValueObj(EndpointBean.OPERATION_NAME_TO_META_DATA); - OperationMetaData meta = (OperationMetaData) operationNameToMetaData.get(mOperation.getLocalPart()); - mPollMilliSeconds = meta.getJDBCSql().getPollMilliSeconds(); - mTableName = meta.getJDBCSql().getTableName(); - if (epb.isClustered()) { - try { - if (this.connection == null) { - connection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty( - RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)); - connection.setAutoCommit(true); - } - } catch (NamingException ne) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11104.JHM_ERROR_IN_LOOKUP", - new Object[] { new Object[] { mRuntimeConfig.getProperties().getProperty( - RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME) } }), ne); - throw ne; - } catch (SQLException se) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11105.JHM_ERROR_WHILE_GETTING_CONNECTION"), se); - throw se; - } - upDateHeartBeat(); - } - } catch (final Exception e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11102.JHM_ERROR_WHILE_EXECUTING_SQL"), e); - throw e; - } - } - - /** - * @param jndiName - * @return - * @throws javax.naming.NamingException - */ - private Object getDataSourceFromContext(final String jndiName) throws javax.naming.NamingException { - final Context c = mContext.getNamingContext(); - return c.lookup(jndiName); - } - - /** - * @param jndiName - * @return - * @throws Exception - */ - private Connection getDatabaseConnection(final String jndiName) throws SQLException, NamingException { - return ((DataSource) getDataSourceFromContext(jndiName)).getConnection(); - } - - /* - * stop the thread to update the Hearbeat. - */ - - protected void stopReceiving() { - mMonitor.set(Boolean.TRUE); - try { - if (connection != null) { - connection.close(); - } - } catch (Exception e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11106.JHM_ERROR_WHILE_CLOSING_CONNECTION"), e); - } - } - - /* - * Runtime Config object to cluster JNDI name - */ - public void setRuntimeConfig(RuntimeConfiguration runtimeConfg) { - mRuntimeConfig = runtimeConfg; - } - - /* - * used to update the instance heartbeat per each poll to know instance is alive or not - */ - - private boolean upDateHeartBeat() { - boolean retry; - boolean updated; - do { - retry = false; - updated = false; - PreparedStatement ps = null; - Connection con = null; - String BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$ - " SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$ - - String BASE_INSTANCESTATE_INSERT_STMT_STR = "INSERT INTO INSTANCESTATE" + //$NON-NLS-1$ - " VALUES(?, CURRENT_TIMESTAMP, ?)"; //$NON-NLS-1$ - - try { - con = connection; - ps = con.prepareStatement(BASE_INSTANCESTATE_UPDATE_STMT_STR); - ps.setString(1, epb.getInstanceName()); - ps.setString(2, mTableName); - int updateCount = ps.executeUpdate(); - if (updateCount == 0) { - // this indicates that no entry exists for this instance id, - // insert new one. - ps = con.prepareStatement(BASE_INSTANCESTATE_INSERT_STMT_STR); - ps.setString(1, epb.getInstanceName()); - ps.setString(2, mTableName); - int inserted = ps.executeUpdate(); - } - mLogger.log(Level.INFO, mMessages.getString("DBBC_R10902.JHM_UPDATED_TIME_STAMP", - new Object[] { new Object[] { epb.getInstanceName() } })); - updated = true; - } catch (SQLException e) { - if (con != null) { - try { - con.rollback(); - } catch (SQLException ex) { - mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11001.JCM_EXCEPTION_WHILE_ROLLBACK"), ex); - } - } else { - // TODO retry for the connection - } - } finally { - if (ps != null) { - try { - ps.close(); - } catch (SQLException e) { - mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), e); - } - } - } - } while (retry); - return updated; - } - -} diff --git a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties index 13ad4da86..6938e5b97 100755 --- a/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties +++ b/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/messages/Bundle.properties @@ -277,21 +277,3 @@ DBBC-E01135.Invalid_Server_Url=DBBC-E01135.{0} is an invalid URL: It should be i ############################ resource bundles for LobHandler ################ DBBC_L00000.CLOB_Null=The Clob from obtained resultset is null DBBC_L00001.BLOB_Null=The Blob from obtained resultset is null -############################ resource bundles for JDBCClusterManager ################ -DBBC_R10901.JCM_UPDATED_TIME_STAMP=DBBC_R10901.Updated the timestamp for instance {0}. -DBBC_W10901.JCM_EXCEPTION_WHILE_ROLLBACK=DBBC_R10901.Exception while roll back the transaction.{0} -DBBC_E11101.JCM_CONNECTON_EXCEPTION=DBBC_E11101.No DataSource is configured with the JNDI name.{0} -DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS=DBBC_W11002.Exception while closing the PS -DBBC_W11003.JCM_EXCEPTION_WHILE_CLOSING_RS=DBBC_W11003.Exception while closing the RS -DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS=DBBC_W11004.Exception while while getting the prepared statement. -DBBC-R10903.JCM_RECORD_LOCKED=DBBC-R10903.Record is already processed by another instance. -DBBC_W11005.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS=DBBC_W11005.Exception while adding batch to the prepared statement. - - -############################ resource bundles for JDBCHeartbeatManager ################ -DBBC_E11102.JHM_ERROR_WHILE_EXECUTING_SQL=DBBC_E11102.Error while executing the SQL. -DBBC_E11103.JHM_THREAD_SLEEP_ABRUPTED=DBBC_E11103.Unexpected exception Occured In Inbound message processor thread during sleep. -DBBC_E11104.JHM_ERROR_IN_LOOKUP=DBBC_E11104.Error while looking up the datasource with jndi name.{0} -DBBC_E11105.JHM_ERROR_WHILE_GETTING_CONNECTION=DBBC_E11105.Exception while getting the connection. -DBBC_E11106.JHM_ERROR_WHILE_CLOSING_CONNECTION=DBBC_E11106.Exception thrown when closing DB cursors -DBBC_R10902.JHM_UPDATED_TIME_STAMP=DBBC_R10902.Updated the timestamp for instance {0}.