Remove cluster manager

master
Vitaliy Filippov 2015-12-18 00:33:56 +03:00
parent 4e8e052920
commit 6a6f2a2b47
5 changed files with 0 additions and 1101 deletions

View File

@ -150,8 +150,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
private int mRowCount = 0;
Connection mClusterConnection = null;
private String mTableName = null;
private String mDbName = null;
@ -162,7 +160,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
private int mPollMilliSeconds = 10000;
private int mThrottleNumber = -1;
private JDBCClusterManager mJDBCClusterManager = null;
// Settings for custom reliability header extensions
public static final String CUSTOM_RELIABILITY_MESSAGE_ID_PROPERTY = "com.stc.jbi.messaging.messageid"; // NOI18N
@ -189,14 +186,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mOperation = opname;
mMonitor = new AtomicBoolean(false);
final DocumentBuilderFactory docBuilderFact = DocumentBuilderFactory.newInstance();
if(endpoint.isClustered()){
try{
mJDBCClusterManager = new JDBCClusterManager(context);
}catch(Exception e){
//TODO
}
}
}
/**
@ -376,39 +365,7 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
if (mDbName==null){
mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase();
}
String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME);
if(epb.isClustered()){
try{
if(jndiName.equalsIgnoreCase(clusterJNDIName)){
mClusterConnection = connection;
mJDBCClusterManager.setJNDIName(clusterJNDIName);
String prdtName = DBMetaData.getDBType(mClusterConnection);
mJDBCClusterManager.setProductName(prdtName);
}else{
mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
String prdtName = DBMetaData.getDBType(mClusterConnection);
mJDBCClusterManager.setProductName(prdtName);
}
}catch(Exception e){
if(mClusterConnection == null){
//TODO retry;
throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION",
new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} ));
}
}
}
if (isSelectStatement(mSelectSQL)) {
if(epb.isClustered()){
mJDBCClusterManager.setDataBaseConnection(mClusterConnection);
mJDBCClusterManager.setTableName(mTableName);
mJDBCClusterManager.setInstanceName(epb.getInstanceName());
mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds);
mJDBCClusterManager.setPKName(mPKName);
mJDBCClusterManager.doClusterTasks();
mClusterConnection.setAutoCommit(false);
}
rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL);
if (rs != null) {
@ -428,11 +385,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
final List tempList = epb.getProcessList();
if (!(tempList.isEmpty()))
{
if (epb.isClustered())
{
mJDBCClusterManager.addInstances(tempList);
mClusterConnection.setAutoCommit(true);
}
//set JNDI name on NormalizedMessage for dynamic addressing
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
exchange.setMessage(inMsg, "in");
@ -442,27 +394,8 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mChannel.send(exchange);
epb.getEndpointStatus().incrementSentRequests();
if(epb.isClustered()){
//Records already sent to NMR so update the status to "SENT" for owner table
try{
int i[] = mJDBCClusterManager.updateStatus(tempList, "SENT");
mLogger.log(Level.INFO,
"DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT",
new Object[] { tempList });
}catch(Exception e){
// TODO need to handled the exception
mLogger.log(Level.SEVERE,
"DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT",
new Object[] { tempList, e.getLocalizedMessage() });
}
}
} else {
if (epb.isClustered())
mClusterConnection.setAutoCommit(true);
}
}
else if (epb.isClustered())
mClusterConnection.setAutoCommit(true);
}
}
} catch (final Exception ex) {
@ -489,14 +422,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
try{
if (mClusterConnection != null && mClusterConnection != connection){
mClusterConnection.close();
mClusterConnection = null;
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
}
}
@ -556,16 +481,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
throw new MessagingException(msg, new NamingException());
}
}
if (epb.isClustered()) {
List<String> pkList = mJDBCClusterManager.selectAllProcessed();
if (pkList.size() > 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0, l = pkList.size(); i < l; i++)
sb.append(i < l-1 ? "?," : "?");
where = (where.equals("") ? "" : where+" AND ")+mPKName+" NOT IN ("+sb.toString()+")";
bind.addAll(pkList);
}
}
lSelectSQL = lSelectSQL.replace("$WHERE", where.equals("") ? "1=1" : where);
mLogger.log(Level.INFO, "Executing sql 1. " + lSelectSQL);
PreparedStatement ps = connection.prepareStatement(lSelectSQL);
@ -800,33 +715,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
connection = getDatabaseConnection(jndiName);
doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId));
// for cluster environment
if(epb.isClustered()){
Connection con = null;
try{
List records = (List)mMapInboundExchangesProcessRecords.get(messageId);
if(jndiName.equalsIgnoreCase(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME))){
con = connection;
}else{
con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
}
mJDBCClusterManager.setDataBaseConnection(con);
mJDBCClusterManager.deleteInstances(records);
mLogger.log(Level.INFO,
"DBBC_R10907.IMP_UPDATED_STATUS_TO_DONE",
new Object[] { records });
}catch(Exception e){
mLogger.log(Level.SEVERE, "Unable to delete processed records", e.getLocalizedMessage());
}finally {
try{
if(con != null && con != connection){
con.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, "Unable to close the connection", se.getLocalizedMessage());
}
}
}
} else {
mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId });
// Any status other than 'DONE' is considered an error

View File

@ -86,7 +86,6 @@ class InboundReceiver {
// This is removed since never used
//private final Map mEndpoints;
private final Map<String,InboundMessageProcessor> mActivatedInboundMsgProcs;
private final Map<String,JDBCHeartbeatManager> mActivatedJDBCHeartbeatManager;
/**
*
@ -135,7 +134,6 @@ class InboundReceiver {
new LinkedBlockingQueue(), Executors.defaultThreadFactory());
mInboundPooledExecutor.prestartAllCoreThreads();
mActivatedInboundMsgProcs = Collections.synchronizedMap(new HashMap<String,InboundMessageProcessor>());
mActivatedJDBCHeartbeatManager = Collections.synchronizedMap(new HashMap<String,JDBCHeartbeatManager>());
//mEndpoints = endpoints;
}
@ -190,15 +188,6 @@ class InboundReceiver {
if (!mActivatedInboundMsgProcs.containsKey(key)) {
try {
if(endpoint.isClustered()){
final JDBCHeartbeatManager proc = new JDBCHeartbeatManager(
endpoint, mContext, opname);
proc.setRuntimeConfig(mRuntimeConfig);
final Thread task = new Thread(proc);
task.start();
// store the thread in map
mActivatedJDBCHeartbeatManager.put(key, proc);
}
final InboundMessageProcessor proc = new InboundMessageProcessor(mChannel,
endpoint, mContext, opname);
proc.setRuntimeConfig(mRuntimeConfig);
@ -252,12 +241,6 @@ class InboundReceiver {
// Remove the thread from the map
mActivatedInboundMsgProcs.remove(key);
} // if
if (mActivatedJDBCHeartbeatManager.containsKey(key) && endpoint.isClustered()) {
final JDBCHeartbeatManager proc = mActivatedJDBCHeartbeatManager.get(key);
// Stop the Heart beat message processor thread
proc.stopReceiving();
mActivatedJDBCHeartbeatManager.remove(key);
} // if
} // for
if (mLogger.isLoggable(Level.FINEST)) {

View File

@ -1,718 +0,0 @@
/*
* BEGIN_HEADER - DO NOT EDIT
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the "License"). You may not use this file except
* in compliance with the License.
*
* You can obtain a copy of the license at
* https://open-jbi-components.dev.java.net/public/CDDLv1.0.html.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* HEADER in each file and include the License file at
* https://open-jbi-components.dev.java.net/public/CDDLv1.0.html.
* If applicable add the following below this CDDL HEADER,
* with the fields enclosed by brackets "[]" replaced with
* your own identifying information: Portions Copyright
* [year] [name of copyright owner]
*/
/*
* @(#)JDBCClusterManager.java
*
* Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
*
* END_HEADER - DO NOT EDIT
*/
package org.glassfish.openesb.databasebc;
import java.sql.Connection;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.jbi.component.ComponentContext;
import com.sun.jbi.internationalization.Messages;
public class JDBCClusterManager {
private static final Messages mMessages = Messages.getMessages(JDBCClusterManager.class);
private static final Logger mLogger = Messages.getLogger(JDBCClusterManager.class);
private String mTableName;
private ComponentContext mContext;
private String mprodName;
private String mPKName;
private String mInstanceName;
private List<String> mPKList = new ArrayList<String>();
private String mPKValue;
private int mPollingInterval;
private long mHeartbeatUpdateConfigInterval;
private Connection connection = null;
private String mJNDIName = null;
private String BASE_INSTANCESTATE_INSERT_STMT_STR;
private String BASE_INSTANCESTATE_UPDATE_STMT_STR;
private String BASE_OWNERTABLE_SELECTALL_STMT_STR;
private String BASE_OWNERTABLE_INSERT_STMT_STR;
private String BASE_OWNERTABLE_UPDATE_STMT_STR;
private String BASE_OWNERTABLE_DELETE_STMT_STR;
private String BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR;
private String BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR;
private String BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR;
private String BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR;
private Object objectLock = new Object();
private Object mUpdateHeartbeatLock = new Object();
private Object mRecoverDanglingInstanceLock = new Object();
private boolean mRecoverDanglingInstInProgress = false;
/*
* Constructor
*/
public JDBCClusterManager(ComponentContext context) throws Exception {
mContext = context;
}
/*
* polling table name @param val string
*/
public void setTableName(String val) {
this.mTableName = val;
}
/*
* get the table name @return mTableName
*/
public String getTableName() {
return this.mTableName;
}
/*
* primary key name of the polling table @return String mPKName
*/
public String getPKName() {
return mPKName;
}
/*
* set the primary key name of the polling table @param string val
*/
public void setPKName(String val) {
mPKName = val;
}
/*
* set the primary key value @param string val
*/
public void setPKValue(String val) {
mPKValue = val;
}
/*
* get the primary key value @return string mPKValue
*/
public String getPKValue() {
return mPKValue;
}
/*
* set heartbeat Config Interval, i.e 200% of the polling interval @param int val
*/
public void setHeartbeatConfigInterval(int val) {
mHeartbeatUpdateConfigInterval = val * 200 / 100;
}
/*
* get heartbeat Config Interval, i.e 200% of the polling interval @return long
* mHeartbeatUpdateConfigInterval
*/
public long getHeartbeatConfigInterval() {
return mHeartbeatUpdateConfigInterval;
}
/*
* set instance Name @param string val
*/
public void setInstanceName(String val) {
mInstanceName = val;
}
/*
* get the instance Name @return string mInstance Name
*/
public String getInstanceName() {
return mInstanceName;
}
public void setProductName(String prodName){
mprodName = prodName;
}
/*
* do the clustering task, 1. to update the heartbeat, 2 update the dangling instanes with the
* current instance for ownership, 3 select dangling instances PK list(this list contains more
* pk values, while constructing the normalize message in JDBCNormalizer class, based on the
* numberOfRecords config, process only that many records per poll. If this list contains more
* than that configured number then process those records in the next poll.)
*/
public void doClusterTasks() {
constructSQLQueries();
// This not requried here againg to set timestamp.
// boolean heartBeatUpdated = upDateHeartBeat();
// if(heartBeatUpdated){
doInstanceFailover();
// }
}
/**
* Check for the failed instance and acquire the dangling instances.
*/
private void doInstanceFailover() {
synchronized (mRecoverDanglingInstanceLock) {
if (mRecoverDanglingInstInProgress) {
return;
}
mRecoverDanglingInstInProgress = true;
}
/* boolean danglingInstancesUpdated = updateDanglingInstances();
if(danglingInstancesUpdated){
this.mPKList = selectDanglingInstancesPKList();
};*/
boolean deleted = deleteDanglingInstances();
mRecoverDanglingInstInProgress = false;
}
/*
* get the under line database connection @return connection
*/
public Connection getDataBaseConnection() throws Exception {
if(this.connection != null){
return this.connection;
}
try{
this.connection = getDatabaseConnection(mJNDIName);
this.connection.setAutoCommit(true);
}catch (Exception e){
throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION",
new Object[] {this.mJNDIName} ));
}
return this.connection;
}
/*
* set the under line database connection @return connection
*/
public void setDataBaseConnection(Connection con) throws Exception {
if(con != null)
this.connection = con;
}
/*
* used to the Datasource @param String val
*/
public void setJNDIName(String val){
this.mJNDIName = val;
}
/*
* used to update the instnace heatbeat per each poll to know instance is alive or not
*/
/*
* private boolean upDateHeartBeat() { boolean retry; boolean updated; do { retry = false;
* updated = false; PreparedStatement ps = null; Connection con = null; try { con =
* getDataBaseConnection(); ps = con.prepareStatement(BASE_INSTANCESTATE_UPDATE_STMT_STR);
* ps.setString(1, getInstanceName()); int updateCount = ps.executeUpdate(); if(updateCount ==
* 0){ // this indicates that no entry exists for this instance id, // insert new one. ps =
* con.prepareStatement(BASE_INSTANCESTATE_INSERT_STMT_STR); ps.setString(1, getInstanceName());
* int inserted = ps.executeUpdate(); }else if(updateCount == 1){ System.out.println("updated
* the time stamp for the instnce "+getInstanceName()); mLogger.log(Level.INFO,
* mMessages.getString("DBBC_R10901.JCM_UPDATED_TIME_STAMP", new Object[] {getInstanceName()})); }
* updated = true; }catch(Exception e){ if(con != null){ try{ con.rollback(); }catch(Exception
* ex){ System.out.println("Exception while roll back the transaction "+ex);
* mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11001.JCM_EXCEPTION_WHILE_ROLLBACK"),
* ex); } }else{ // TODO retry for the connection } } finally{ if(ps != null){ try{ ps.close(); }
* catch(SQLException e){ mLogger.log(Level.SEVERE,
* mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), e); } } } } while (retry);
* return updated; }
*/
/*
* get the dangling instances PK list.
*/
public List<String> getDanglingInstancePKList() {
return mPKList;
}
/*
* Query for the dangling instances PK list for instance to process with current instance.
*/
private List selectDanglingInstancesPKList(){
String selectQuery = BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR;
PreparedStatement ps = null;
ResultSet rs = null;
Connection con = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(selectQuery);
ps.setString(1, getInstanceName());
rs = ps.executeQuery();
while(rs.next()){
String pkValue = rs.getString(getPKName());
mPKList.add(pkValue);
}
} catch (SQLException ex) {
mLogger.log(Level.SEVERE, ex.getLocalizedMessage());
}catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} finally{
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11003.JCM_EXCEPTION_WHILE_CLOSING_RS"), e);
}
}
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), e);
}
}
}
return mPKList;
}
/*
* get the instance ownership transfer. @ return boolean update if successed
*/
private boolean updateDanglingInstances() {
boolean updated = false;
String updateQuery = BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR;
PreparedStatement ps = null;
ParameterMetaData paramMetaData = null;
int parameters = 0;
Connection con = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(updateQuery);
paramMetaData = ps.getParameterMetaData();
if (paramMetaData != null) {
parameters = paramMetaData.getParameterCount();
}
} catch (SQLException ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
}
if (parameters != 0) {
// set default type.
int columnType = java.sql.Types.VARCHAR;
try {
columnType = paramMetaData.getParameterType(1);
ps.setObject(1, JDBCUtil.convert(getInstanceName(), columnType), columnType);
ps.setLong(2, getHeartbeatConfigInterval());
columnType = paramMetaData.getParameterType(3);
ps.setObject(3, JDBCUtil.convert(getInstanceName(), columnType), columnType);
columnType = paramMetaData.getParameterType(4);
ps.setObject(4, JDBCUtil.convert(getTableName(), columnType), columnType);
int rowsUpdated = ps.executeUpdate();
if(rowsUpdated > 0 ){
mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED",
new Object[] { getInstanceName() }));
updated = true;
}
} catch (SQLException e) {
mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED",
new Object[] { getInstanceName() }));
updated = false;
}catch (Exception e) {
mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED",
new Object[] { getInstanceName() }));
updated = false;
} finally{
if(ps != null){
try{
ps.close();
} catch(SQLException e){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
e);
}
}
}
}
return updated;
}
private boolean deleteDanglingInstances() {
boolean deleted = false;
String deleteQuery = null;
if(mprodName != null && mprodName.equalsIgnoreCase("ORACLE")){
deleteQuery = BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR;
}else{
deleteQuery = BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR;
}
PreparedStatement ps = null;
ParameterMetaData paramMetaData = null;
int parameters = 0;
Connection con = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(deleteQuery);
paramMetaData = ps.getParameterMetaData();
if (paramMetaData != null) {
parameters = paramMetaData.getParameterCount();
}
} catch (SQLException ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
}
if (parameters != 0) {
// set default type.
int columnType = java.sql.Types.VARCHAR;
try {
ps.setLong(1, getHeartbeatConfigInterval());
//columnType = paramMetaData.getParameterType(2);
//ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType);
//columnType = paramMetaData.getParameterType(3);
//ps.setObject(3, JDBCUtil.convert(getTableName(), columnType), columnType);
ps.setString(2,getInstanceName());
ps.setString(3,getTableName());
int rowsDeleted = ps.executeUpdate();
if(rowsDeleted > 0 ){
mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED",
new Object[] { getInstanceName() }));
deleted = true;
}
} catch (SQLException e) {
mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED",
new Object[] { getInstanceName() }));
deleted = false;
}catch (Exception e) {
mLogger.log(Level.INFO, mMessages.getString("DBBC-R01127.JDBCN_RECORD_LOCKED",
new Object[] { getInstanceName() }));
deleted = false;
} finally{
if(ps != null){
try{
ps.close();
} catch(SQLException e){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
e);
}
}
}
}
return deleted;
}
/*
* Insert new rows with the "In Progress" state
*/
public int[] addInstances(List pkList) throws Exception {
PreparedStatement ps = null;
ParameterMetaData paramMetaData = null;
int parameters = 0;
Connection con = null;
int[] executedRows = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(BASE_OWNERTABLE_INSERT_STMT_STR);
paramMetaData = ps.getParameterMetaData();
if (paramMetaData != null) {
parameters = paramMetaData.getParameterCount();
}
} catch (SQLException ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
}
try {
if (parameters != 0) {
addBatch(pkList, paramMetaData, ps, "In Progress", true);
}
executedRows = ps.executeBatch();
for (int i = 0; i < executedRows.length; i++) {
if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) {
throw new SQLException(
"One of the Queries in the batch didn't update any rows, Should have updated atleast one row");
}
mLogger.log(Level.INFO, "Inserted In Progress OWNER record "+pkList.get(i).toString());
}
} catch(Exception e) {
if(ps != null){
try{
ps.clearBatch();
ps.close();
} catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), se);
}
}
throw e;
}
return executedRows;
}
public void deleteInstances(List pkList) throws Exception {
if (pkList.size() <= 0)
return;
PreparedStatement ps = null;
String sql = BASE_OWNERTABLE_DELETE_STMT_STR+" (";
for (int i = 0, l = pkList.size(); i < l; i++)
sql += (i < l-1 ? "?," : "?)");
ParameterMetaData paramMetaData = null;
int parameters = 0;
Connection con = null;
int[] executedRows = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(sql);
ps.setString(1, getInstanceName());
for (int i = 0, l = pkList.size(); i < l; i++)
ps.setString(i+2, (String)pkList.get(i));
if (mLogger.isLoggable(Level.FINE))
mLogger.log(Level.FINE, "Executing SQL: "+sql); //$NON-NLS-1$
ps.execute();
} catch (Exception e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), e);
throw e;
}
}
/*
* Update the status to "SENT" in the owner table after sending the PKList to NMR. This flag is
* used to not process the died instance records which are already sent to NMR, and making the staus
* to "DONE" after getting the done from NMR
*/
public int[] updateStatus(List pkList, String status) throws Exception {
String updateQuery = BASE_OWNERTABLE_UPDATE_STMT_STR;
PreparedStatement ps = null;
ParameterMetaData paramMetaData = null;
int parameters = 0;
Connection con = null;
int[] executedRows = null;
try {
con = getDataBaseConnection();
ps = con.prepareStatement(updateQuery);
paramMetaData = ps.getParameterMetaData();
if (paramMetaData != null) {
parameters = paramMetaData.getParameterCount();
}
} catch (SQLException ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
} catch (Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS"), ex);
}
try {
if (parameters != 0) {
addBatch(pkList, paramMetaData, ps, status, false);
}
executedRows = ps.executeBatch();
for (int i = 0; i < executedRows.length; i++) {
if (executedRows[i] == PreparedStatement.EXECUTE_FAILED) {
throw new SQLException(
"One of the Queries in the batch didn't update any rows, Should have updated atleast one row");
}
;
}
} catch(Exception e) {
if(ps != null){
try{
ps.clearBatch();
ps.close();
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
se);
}
}
throw e;
}
return executedRows;
}
private void addBatch(List pkList, ParameterMetaData parameterMeta, PreparedStatement ps, String status, boolean forIns) throws Exception {
if (!pkList.isEmpty()) {
for (final Iterator it = pkList.iterator(); it.hasNext();) {
String pkValue = (String) it.next();
if ((pkValue != null) && !pkValue.trim().equals("")) {
// set default type.
int columnType = java.sql.Types.VARCHAR;
try {
if (forIns)
{
try { columnType = parameterMeta.getParameterType(2); } catch(Exception e) {}
ps.setObject(2, JDBCUtil.convert(getInstanceName(), columnType), columnType);
}
try { columnType = parameterMeta.getParameterType(forIns ? 3 : 1); } catch(Exception e) {}
ps.setObject(forIns ? 3 : 1, JDBCUtil.convert(status, columnType), columnType);
try { columnType = parameterMeta.getParameterType(forIns ? 1 : 2); } catch(Exception e) {}
ps.setObject(forIns ? 1 : 2, JDBCUtil.convert(pkValue, columnType), columnType);
ps.addBatch();
} catch (Exception e) {
mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11005.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS"), e);
if(ps != null){
try{
ps.clearBatch();
ps.close();
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"),
e);
}
}
throw e;
}
}
}
}
}
public List selectAllProcessed() throws Exception
{
Connection con = getDataBaseConnection();
List<String> pkList = new ArrayList<String>();
Statement st = con.createStatement();
st.execute(BASE_OWNERTABLE_SELECTALL_STMT_STR);
ResultSet rs = st.getResultSet();
while (rs.next())
pkList.add(rs.getString(1));
return pkList;
}
/*
* Construct SQL queries for the INSTANCESTATE Table and for OWNER_bussiner_table. to avoid
* duplication and to support failover
*/
private void constructSQLQueries(){
if(BASE_INSTANCESTATE_INSERT_STMT_STR == null){
BASE_INSTANCESTATE_INSERT_STMT_STR = "INSERT INTO INSTANCESTATE"+ //$NON-NLS-1$
" VALUES(?, ?, CURRENT_TIMESTAMP)"; //$NON-NLS-1$
}
if(BASE_INSTANCESTATE_UPDATE_STMT_STR == null){
BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$
" SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$
}
if(BASE_OWNERTABLE_SELECTALL_STMT_STR == null){
BASE_OWNERTABLE_SELECTALL_STMT_STR = "SELECT "+getPKName()+" FROM OWNER_"+getTableName()+
" FOR UPDATE";
}
if(BASE_OWNERTABLE_INSERT_STMT_STR == null){
BASE_OWNERTABLE_INSERT_STMT_STR = "INSERT INTO OWNER_" + getTableName() + //$NON-NLS-1$
" VALUES(?, ?, ?)"; //$NON-NLS-1$
}
if(BASE_OWNERTABLE_DELETE_STMT_STR == null){
BASE_OWNERTABLE_DELETE_STMT_STR = "DELETE FROM OWNER_" + getTableName() + //$NON-NLS-1$
" WHERE instance_name=? AND "+getPKName()+" IN "; //$NON-NLS-1$
}
if(BASE_OWNERTABLE_UPDATE_STMT_STR == null){
BASE_OWNERTABLE_UPDATE_STMT_STR = "UPDATE OWNER_" + getTableName() + //$NON-NLS-1$
" SET status = ? " + " WHERE " + getPKName() + " = ?"; //$NON-NLS-1$
}
if(BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR == null){
BASE_OWNERTABLE_DELETE_DANGLING_STMT_STR = "delete from OWNER_"+getTableName()//$NON-NLS-1$ //$NON-NLS-1$
+ " where (status ='In Progress' or status ='SENT') and instance_name IN (select instanceid from INSTANCESTATE"
+ " where {fn TIMESTAMPDIFF(SQL_TSI_SECOND, timestamp(lastupdatetime), CURRENT_TIMESTAMP)} > ?/1000 "
+ " and instanceid != ? and tablename = ?) ";
}
if(BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR == null){
BASE_ORACLE_OWNERTABLE_DELETE_DANGLING_STMT_STR = "delete from OWNER_"+getTableName()//$NON-NLS-1$ //$NON-NLS-1$
+ " where (status ='In Progress' or status ='SENT') and instance_name IN (select instanceid from INSTANCESTATE"
+ " where (trim(to_number(substr((sysdate-lastupdatetime)*24*60*60,2,(LENGTH((sysdate-lastupdatetime)*24*60*60)-19))))) > (? / 1000) "
+ " and instanceid != ? and tablename = ?) ";
}
if(BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR == null){
BASE_OWNERTABLE_UPDATE_DANGLING_STMT_STR = "update OWNER_"+getTableName()+" set instance_name = ? " //$NON-NLS-1$ //$NON-NLS-1$
+ " where (status ='In Progress') and instance_name IN (select instanceid from INSTANCESTATE"
+ " where {fn TIMESTAMPDIFF(SQL_TSI_SECOND, timestamp(lastupdatetime), CURRENT_TIMESTAMP)} > ?/1000 "
+ " and instanceid != ? and tablename = ?) ";
}
if(BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR == null){
BASE_OWNERTABLE_SELECT_DANGLING_STMT_STR = "select "+getPKName()+" from OWNER_"+getTableName()+ " where (status ='In Progress')"+
" and instance_name = ?";
}
}
/**
* @param jndiName
* @return
* @throws javax.naming.NamingException
*/
private Object getDataSourceFromContext(final String jndiName) throws javax.naming.NamingException {
final Context c = mContext.getNamingContext();
return c.lookup(jndiName);
}
/**
* @param jndiName
* @return
* @throws Exception
*/
private Connection getDatabaseConnection(final String jndiName) throws SQLException, NamingException {
return ((DataSource) getDataSourceFromContext(jndiName)).getConnection();
}
}

View File

@ -1,236 +0,0 @@
/*
* BEGIN_HEADER - DO NOT EDIT
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the "License"). You may not use this file except
* in compliance with the License.
*
* You can obtain a copy of the license at
* https://open-jbi-components.dev.java.net/public/CDDLv1.0.html.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* HEADER in each file and include the License file at
* https://open-jbi-components.dev.java.net/public/CDDLv1.0.html.
* If applicable add the following below this CDDL HEADER,
* with the fields enclosed by brackets "[]" replaced with
* your own identifying information: Portions Copyright
* [year] [name of copyright owner]
*/
/*
* @(#)JDBCHeartbeatManager.java
*
* Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
*
* END_HEADER - DO NOT EDIT
*/
package org.glassfish.openesb.databasebc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sun.jbi.internationalization.Messages;
import javax.jbi.component.ComponentContext;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.xml.namespace.QName;
/**
* author : Venkat P Process requests received from the External Database
*/
public class JDBCHeartbeatManager implements Runnable {
EndpointBean epb;
private ComponentContext mContext;
private RuntimeConfiguration mRuntimeConfig;
private AtomicBoolean mMonitor;
PreparedStatement ps = null;
Connection connection = null;
private String mTableName = null;
private int mPollMilliSeconds = 10000;
private QName mOperation;
private static final Messages mMessages = Messages.getMessages(JDBCHeartbeatManager.class);
private static final Logger mLogger = Messages.getLogger(JDBCHeartbeatManager.class);
/*
* Constructor
*/
public JDBCHeartbeatManager(final EndpointBean endpoint, final ComponentContext context, final QName opname)
throws Exception {
epb = endpoint;
mContext = context;
mMonitor = new AtomicBoolean(false);
mOperation = opname;
}
// @Override
public void run() {
do {
try {
execute();
} catch (final Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11102.JHM_ERROR_WHILE_EXECUTING_SQL"), ex);
}
try {
Thread.sleep(mPollMilliSeconds);
} catch (final Exception e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11103.JHM_THREAD_SLEEP_ABRUPTED"), e);
}
} while (mMonitor.get() != Boolean.TRUE);
}
/**
* @throws Exception
*/
public void execute() throws Exception {
try {
Map operationNameToMetaData = (Map) epb.getValueObj(EndpointBean.OPERATION_NAME_TO_META_DATA);
OperationMetaData meta = (OperationMetaData) operationNameToMetaData.get(mOperation.getLocalPart());
mPollMilliSeconds = meta.getJDBCSql().getPollMilliSeconds();
mTableName = meta.getJDBCSql().getTableName();
if (epb.isClustered()) {
try {
if (this.connection == null) {
connection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(
RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
connection.setAutoCommit(true);
}
} catch (NamingException ne) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11104.JHM_ERROR_IN_LOOKUP",
new Object[] { new Object[] { mRuntimeConfig.getProperties().getProperty(
RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME) } }), ne);
throw ne;
} catch (SQLException se) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11105.JHM_ERROR_WHILE_GETTING_CONNECTION"), se);
throw se;
}
upDateHeartBeat();
}
} catch (final Exception e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11102.JHM_ERROR_WHILE_EXECUTING_SQL"), e);
throw e;
}
}
/**
* @param jndiName
* @return
* @throws javax.naming.NamingException
*/
private Object getDataSourceFromContext(final String jndiName) throws javax.naming.NamingException {
final Context c = mContext.getNamingContext();
return c.lookup(jndiName);
}
/**
* @param jndiName
* @return
* @throws Exception
*/
private Connection getDatabaseConnection(final String jndiName) throws SQLException, NamingException {
return ((DataSource) getDataSourceFromContext(jndiName)).getConnection();
}
/*
* stop the thread to update the Hearbeat.
*/
protected void stopReceiving() {
mMonitor.set(Boolean.TRUE);
try {
if (connection != null) {
connection.close();
}
} catch (Exception e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11106.JHM_ERROR_WHILE_CLOSING_CONNECTION"), e);
}
}
/*
* Runtime Config object to cluster JNDI name
*/
public void setRuntimeConfig(RuntimeConfiguration runtimeConfg) {
mRuntimeConfig = runtimeConfg;
}
/*
* used to update the instance heartbeat per each poll to know instance is alive or not
*/
private boolean upDateHeartBeat() {
boolean retry;
boolean updated;
do {
retry = false;
updated = false;
PreparedStatement ps = null;
Connection con = null;
String BASE_INSTANCESTATE_UPDATE_STMT_STR = "UPDATE INSTANCESTATE" + //$NON-NLS-1$
" SET lastupdatetime = CURRENT_TIMESTAMP " + "WHERE INSTANCEID = ? and TABLENAME = ?"; //$NON-NLS-1$
String BASE_INSTANCESTATE_INSERT_STMT_STR = "INSERT INTO INSTANCESTATE" + //$NON-NLS-1$
" VALUES(?, CURRENT_TIMESTAMP, ?)"; //$NON-NLS-1$
try {
con = connection;
ps = con.prepareStatement(BASE_INSTANCESTATE_UPDATE_STMT_STR);
ps.setString(1, epb.getInstanceName());
ps.setString(2, mTableName);
int updateCount = ps.executeUpdate();
if (updateCount == 0) {
// this indicates that no entry exists for this instance id,
// insert new one.
ps = con.prepareStatement(BASE_INSTANCESTATE_INSERT_STMT_STR);
ps.setString(1, epb.getInstanceName());
ps.setString(2, mTableName);
int inserted = ps.executeUpdate();
}
mLogger.log(Level.INFO, mMessages.getString("DBBC_R10902.JHM_UPDATED_TIME_STAMP",
new Object[] { new Object[] { epb.getInstanceName() } }));
updated = true;
} catch (SQLException e) {
if (con != null) {
try {
con.rollback();
} catch (SQLException ex) {
mLogger.log(Level.WARNING, mMessages.getString("DBBC_W11001.JCM_EXCEPTION_WHILE_ROLLBACK"), ex);
}
} else {
// TODO retry for the connection
}
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS"), e);
}
}
}
} while (retry);
return updated;
}
}

View File

@ -277,21 +277,3 @@ DBBC-E01135.Invalid_Server_Url=DBBC-E01135.{0} is an invalid URL: It should be i
############################ resource bundles for LobHandler ################
DBBC_L00000.CLOB_Null=The Clob from obtained resultset is null
DBBC_L00001.BLOB_Null=The Blob from obtained resultset is null
############################ resource bundles for JDBCClusterManager ################
DBBC_R10901.JCM_UPDATED_TIME_STAMP=DBBC_R10901.Updated the timestamp for instance {0}.
DBBC_W10901.JCM_EXCEPTION_WHILE_ROLLBACK=DBBC_R10901.Exception while roll back the transaction.{0}
DBBC_E11101.JCM_CONNECTON_EXCEPTION=DBBC_E11101.No DataSource is configured with the JNDI name.{0}
DBBC_W11002.JCM_EXCEPTION_WHILE_CLOSING_PS=DBBC_W11002.Exception while closing the PS
DBBC_W11003.JCM_EXCEPTION_WHILE_CLOSING_RS=DBBC_W11003.Exception while closing the RS
DBBC_W11004.JCM_EXCEPTION_WHILE_GETTING_PS=DBBC_W11004.Exception while while getting the prepared statement.
DBBC-R10903.JCM_RECORD_LOCKED=DBBC-R10903.Record is already processed by another instance.
DBBC_W11005.JCM_EXCEPTION_WHILE_ADDING_BATCH_TO_PS=DBBC_W11005.Exception while adding batch to the prepared statement.
############################ resource bundles for JDBCHeartbeatManager ################
DBBC_E11102.JHM_ERROR_WHILE_EXECUTING_SQL=DBBC_E11102.Error while executing the SQL.
DBBC_E11103.JHM_THREAD_SLEEP_ABRUPTED=DBBC_E11103.Unexpected exception Occured In Inbound message processor thread during sleep.
DBBC_E11104.JHM_ERROR_IN_LOOKUP=DBBC_E11104.Error while looking up the datasource with jndi name.{0}
DBBC_E11105.JHM_ERROR_WHILE_GETTING_CONNECTION=DBBC_E11105.Exception while getting the connection.
DBBC_E11106.JHM_ERROR_WHILE_CLOSING_CONNECTION=DBBC_E11106.Exception thrown when closing DB cursors
DBBC_R10902.JHM_UPDATED_TIME_STAMP=DBBC_R10902.Updated the timestamp for instance {0}.