openesb-components/ojc-core/databasebc/databasebcimpl/src/org/glassfish/openesb/databasebc/InboundMessageProcessor.java

1281 lines
60 KiB
Java
Executable File

/*
* BEGIN_HEADER - DO NOT EDIT
*
* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the "License"). You may not use this file except
* in compliance with the License.
*
* You can obtain a copy of the license at
* https://open-jbi-components.dev.java.net/public/CDDLv1.0.html.
* See the License for the specific language governing
* permissions and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* HEADER in each file and include the License file at
* https://open-jbi-components.dev.java.net/public/CDDLv1.0.html.
* If applicable add the following below this CDDL HEADER,
* with the fields enclosed by brackets "[]" replaced with
* your own identifying information: Portions Copyright
* [year] [name of copyright owner]
*/
/*
* @(#)InboundMessageProcessor.java
*
* Copyright 2004-2007 Sun Microsystems, Inc. All Rights Reserved.
*
* END_HEADER - DO NOT EDIT
*/
package org.glassfish.openesb.databasebc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jbi.component.ComponentContext;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.sql.XAConnection;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import com.sun.jbi.internationalization.Messages;
import org.glassfish.openesb.databasebc.model.metadata.DBMetaData;
import org.glassfish.openesb.databasebc.model.runtime.DBConnectionInfo;
import org.glassfish.openesb.databasebc.model.runtime.DatabaseModel;
import org.glassfish.openesb.databasebc.model.runtime.DatabaseModelImpl;
import org.glassfish.openesb.databasebc.model.runtime.Db2DataAccess;
import org.glassfish.openesb.databasebc.model.runtime.DerbyDataAccess;
import org.glassfish.openesb.databasebc.model.runtime.OracleDataAccess;
import org.glassfish.openesb.databasebc.model.runtime.SqlServerDataAccess;
import org.glassfish.openesb.databasebc.transaction.TransactionHelper;
import org.glassfish.openesb.databasebc.transaction.XidImpl;
import com.sun.jbi.nms.exchange.ExchangePattern;
import com.sun.jbi.common.qos.messaging.MessagingChannel;
import com.sun.jbi.common.qos.redelivery.Redelivery;
import com.sun.jbi.common.qos.redelivery.RedeliveryStatus;
import net.java.hulp.measure.Probe;
/**
* author : Venkat P Process requests received from the External Database
*/
public class InboundMessageProcessor implements Runnable, MessageExchangeReplyListener, RedeliveryListener {
private static final String DERBY_PROD_NAME = "DERBY";
private static final String ORACLE_PROD_NAME = "ORACLE";
private static final String SQLSERVER_PROD_NAME = "SQLSERVER";
private static final String DB2_PROD_NAME = "DB2";
private static final String JDBC_PROD_NAME = "JDBC";
private XidImpl xid = null;
private TransactionHelper mTxHelper = null;
private static final String SENT_TO_NMR = "SENT";
// private boolean mtxFlag;
XAResource xaResource = null;
//private TransactionManager mTxManager = null;
private static final Messages mMessages = Messages.getMessages(InboundMessageProcessor.class);
private static final Logger mLogger = Messages.getLogger(InboundMessageProcessor.class);
@SuppressWarnings("unchecked")
private static final Map mInboundExchanges = Collections.synchronizedMap(new HashMap());
private static Map exchangeIDToMeta = Collections.synchronizedMap(new HashMap());
private Map mMapInboundExchangesProcessRecords = new HashMap();
private ArrayList mProcessedList = new ArrayList();
Map mEndpoint;
EndpointBean epb;
DocumentBuilder mDocBuilder;
//private DeliveryChannel mChannel;
private MessagingChannel mChannel;
private MessageExchange mExchange;
private ComponentContext mContext;
private RuntimeConfiguration mRuntimeConfig;
private MessageExchangeFactory mMsgExchangeFactory;
private ServiceEndpoint mServiceEndpoint;
private final QName mOperation;
private AtomicBoolean mMonitor;
private String mPKName = null;
private String mSelectSQL = null;
private String mMarkColumnName = null;
private String mMarkColumnValue = null;
private String mPKType = null;
private String mFlagColumnType = null;
private String mSchemaName = null;
DBConnectionInfo dbConnectionInfo;
private String mXAEnabled = null;
// private DBMetaData mdbMetaData = null;
private DatabaseModel dbDataAccessObject = null;
PreparedStatement ps = null;
ResultSet rs = null;
Connection connection = null;
Connection con = null;
XAConnection xaConnection = null;
Connection mClusterConnection = null;
private String mTableName = null;
private String mDbName=null;
private String mPollingPostProcessing = null;
private String mMoveRowToTableName = null;
private int mPollMilliSeconds = 10000;
private int mThrottleNumber = -1;
private JDBCClusterManager mJDBCClusterManager = null;
// Settings for custom reliability header extensions
public static final String CUSTOM_RELIABILITY_MESSAGE_ID_PROPERTY = "com.stc.jbi.messaging.messageid"; // NOI18N
public static final String CUSTOM_RELIABILITY_HEADER_NAMESPACE_URI = "http://schemas.stc.com/ws/2005/07/custrm"; // NOI18N
public static final String CUSTOM_RELIABILITY_HEADER_LOCAL_NAME = "MessageID"; // NOI18N
// this JNDI is used if the component is installed in cluster environment/
// TODO, need to change it later
public static final String DEFAULT_CLUSTER_JNDI_NAME = "jdbc/__defaultDS";
/**
* JBI message exchange properties for message grouping and sequencing (new CRMP)
*/
public static final String CRMP_GROUP_ID = "com.sun.jbi.messaging.groupid";
public static final String CRMP_MESSAGE_ID = "com.sun.jbi.messaging.messageid";
ReplyListener replyListener;
public InboundMessageProcessor(final MessagingChannel chnl, final EndpointBean endpoint,
final ComponentContext context, final QName opname) throws ParserConfigurationException {
mChannel = chnl;
epb = endpoint;
mContext = context;
replyListener = new ReplyListenerImpl(endpoint);
mOperation = opname;
mMonitor = new AtomicBoolean(false);
mTxHelper = new TransactionHelper();
dbConnectionInfo = new DBConnectionInfo();
final DocumentBuilderFactory docBuilderFact = DocumentBuilderFactory.newInstance();
mDocBuilder = docBuilderFact.newDocumentBuilder();
//mTxManager = (TransactionManager) context.getTransactionManager();
if(endpoint.isClustered()){
try{
mJDBCClusterManager = new JDBCClusterManager(context);
}catch(Exception e){
//TODO
}
}
}
/**
* @return
*/
public static Map getInboundExchanges() {
return InboundMessageProcessor.mInboundExchanges;
}
/**
*
*/
//@Override
public void run() {
if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, "DBBC_R00629.IMP_EP_status");
}
do {
try {
execute();
} catch (final Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00659.IMP_ERROR_WHILE_EXECUTING_SQL"), ex);
}
if (mLogger.isLoggable(Level.FINE)) {
mLogger.log(Level.INFO,mMessages.getString("DBBC_R00660.IMP_FINISHED_EXECUTING_SQL"));
}
try {
Thread.sleep(mPollMilliSeconds);
} catch (final Exception e) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00661.IMP_THREAD_SLEEP_ABRUPTED"), e);
}
} while (mMonitor.get() != Boolean.TRUE);
}
/**
* @throws MessagingException
* @throws Exception
*/
public void execute() throws MessagingException, Exception {
String exchangeId = null;
try {
if (mMsgExchangeFactory == null) {
mMsgExchangeFactory = mChannel.createExchangeFactory();
}
mExchange = mMsgExchangeFactory.createInOnlyExchange();
if (mServiceEndpoint == null) {
mServiceEndpoint = locateServiceEndpoint();
epb.setValueObj(EndpointBean.ENDPOINT_REFERENCE, mServiceEndpoint);
}
if (mServiceEndpoint == null) {
throw new MessagingException(mMessages.getString("DBBC_E00643.IMP_Failed_locate_EP"));
}
exchangeId = mExchange.getExchangeId();
final QName serviceName = (QName) epb.getValueObj(EndpointBean.FULL_SERVICE_NAME);
final String epntName = epb.getValue(EndpointBean.ENDPOINT_NAME);
if (mLogger.isLoggable(Level.FINE)) {
mLogger.fine("Getting bean for" + serviceName + epntName);
}
if (mLogger.isLoggable(Level.FINER)) {
mLogger.fine("Getting bean for" + serviceName + epntName);
}
mExchange.setEndpoint(mServiceEndpoint);
mExchange.setOperation(mOperation);
// Adding re-delivery/re-try support
// we are sending instead of the client context to the ReplyListener
// the EndpointBean
MessageExchangeSupport.addReplyListener(mExchange.getExchangeId(), replyListener, epb);
MessageExchangeSupport.addRedeliveryListener(mExchange.getExchangeId(), this, epb);
Redelivery.setUniqueId(mExchange, exchangeId);
final String status = epb.getValue(EndpointBean.STATUS);
if (! status.equalsIgnoreCase(EndpointBean.STATUS_RUNNING)) {
final String endName = epb.getValue(EndpointBean.ENDPOINT_NAME);
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, "DBBC_W00630.IMP_EP_NOT_RUNNING", new Object[] { endName,
mExchange.getExchangeId() });
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info("DBBC_W00667.IMP_EP_NOT_RUNNING");
}
} else {
switch (ExchangePattern.valueOf(mExchange)) {
case IN_OUT:
mLogger.log(Level.INFO, "DBBC_R00631.IMP_Received_INOUT", mExchange.getExchangeId());
processInOut(mExchange, epb);
break;
case IN_ONLY:
mLogger.log(Level.INFO, "DBBC_R00632.IMP_Received_INONLY", mExchange.getExchangeId());
processInOnly(mExchange, epb);
break;
default:
mLogger.log(Level.INFO, "DBBC_E00633.IMP_Invalid_pattern", mExchange.getExchangeId());
return;
}
}
} catch (final MessagingException ex) {
InboundMessageProcessor.mInboundExchanges.remove(exchangeId);
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00662.IMP_ERROR_WHILE_EXECUTING_MEP"), exchangeId);
throw ex;
} catch (final Exception e) {
InboundMessageProcessor.mInboundExchanges.remove(exchangeId);
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00662.IMP_ERROR_WHILE_EXECUTING_MEP"), exchangeId);
throw e;
}
}
/**
* @param exchange
* @param epb
*/
public void processInOut(final MessageExchange exchange, final EndpointBean epb) {
}
/**
* @param exchange
* @param epb
*/
public void processInOnly(final MessageExchange exchange, final EndpointBean epb) throws Exception {
String exchangeId = null;
String jndiName = null;
try {
epb.getEndpointStatus().incrementReceivedRequests();
NormalizedMessage inMsg = mExchange.createMessage();
exchangeId = exchange.getExchangeId();
final Map operationNameToMetaData = (Map) epb.getValueObj(EndpointBean.OPERATION_NAME_TO_META_DATA);
final OperationMetaData meta = (OperationMetaData) operationNameToMetaData.get(exchange.getOperation().getLocalPart());
if (meta == null) {
throw new MessagingException(InboundMessageProcessor.mMessages.getString("DBBC_E00634.IMP_Invalid_Operation",
new Object[] { exchange.getOperation() }));
}
mPollMilliSeconds = meta.getJDBCSql().getPollMilliSeconds();
mSelectSQL = meta.getJDBCSql().getSql();
/*mPKName = Qualified(meta.getJDBCSql().getPKName());
mMarkColumnName = Qualified(meta.getJDBCSql().getMarkColumnName());
mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue();
mTableName = Qualified(meta.getJDBCSql().getTableName());
mPollingPostProcessing = meta.getJDBCSql().getPollingPostProcessing();
mMoveRowToTableName = Qualified(meta.getJDBCSql().getMoveRowToTableName());
*/
mPKName = meta.getJDBCSql().getPKName();
mMarkColumnName = meta.getJDBCSql().getMarkColumnName();
mMarkColumnValue = meta.getJDBCSql().getMarkColumnValue();
mTableName = meta.getJDBCSql().getTableName();
mPollingPostProcessing = meta.getJDBCSql().getPollingPostProcessing();
mMoveRowToTableName = meta.getJDBCSql().getMoveRowToTableName();
mXAEnabled = meta.getJDBCSql().getTransaction();
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
// createNewDataSource(mXAEnabled,jndiName,epb);
// Throttle Check
if(throttleConfigurationCheck()){
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
// mtxFlag = startTrasaction();
getTransactionManager().begin();
}
dbDataAccessObject = getDataAccessObject(meta);
mSelectSQL = dbDataAccessObject.generateSelectQuery(mSelectSQL, mTableName);
epb.setTableName(mTableName);
connection = getDatabaseConnection(jndiName);
if (mDbName==null){
mDbName = connection.getMetaData().getDatabaseProductName().toLowerCase();
}
String clusterJNDIName = mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME);
if(epb.isClustered()){
try{
if(jndiName.equalsIgnoreCase(clusterJNDIName)){
mClusterConnection = connection;
mJDBCClusterManager.setJNDIName(clusterJNDIName);
String prdtName = DBMetaData.getDBType(mClusterConnection);
mJDBCClusterManager.setProductName(prdtName);
}else{
mClusterConnection = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
mClusterConnection.setAutoCommit(true);
mJDBCClusterManager.setJNDIName(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
String prdtName = DBMetaData.getDBType(mClusterConnection);
mJDBCClusterManager.setProductName(prdtName);
}
}catch(Exception e){
if(mClusterConnection == null){
//TODO retry;
throw new Exception(mMessages.getString("DBBC_E11101.JCM_CONNECTON_EXCEPTION",
new Object[] {mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME)} ));
}
}
}
Transaction tx = getTransactionManager().getTransaction();
if (isSelectStatement(mSelectSQL)) {
rs = executeInboundSQLSelect(epb, meta, connection, mTableName, mSelectSQL);
if (rs != null) {
final JDBCNormalizer normalizer = new JDBCNormalizer();
Probe normalizationMeasurement = Probe.info(getClass(),
epb.getUniqueName(), JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
//if(epb.isClustered() && mClusterConnection != null){
// normalizer.setConnection(mClusterConnection);
// }
if(epb.isClustered()){
mJDBCClusterManager.setDataBaseConnection(mClusterConnection);
mJDBCClusterManager.setTableName(mTableName);
mJDBCClusterManager.setInstanceName(epb.getInstanceName());
mJDBCClusterManager.setHeartbeatConfigInterval(mPollMilliSeconds);
mJDBCClusterManager.setPKName(mPKName);
mJDBCClusterManager.doClusterTasks();
}
normalizer.setInboundExchangeProcessRecordsMap(mMapInboundExchangesProcessRecords);
normalizer.setRecordsProcessedList(mProcessedList);
normalizer.setJDBCClusterManager(mJDBCClusterManager);
inMsg = normalizer.normalizeSelectInbound(rs, exchange, meta, epb, mPKName,mDbName);
if(normalizationMeasurement != null){
normalizationMeasurement.end();
}
final List tempList = epb.getProcessList();
if (!(tempList.isEmpty())) {
// mTxHelper.handleInbound(exchange);
//set JNDI name on NormalizedMessage for dynamic addressing
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
exchange.setMessage(inMsg, "in");
if (tx != null) {
mExchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, tx);
getTransactionManager().suspend();
}
mInboundExchanges.put(exchangeId, new ListenerMeta(
System.currentTimeMillis(), this));
mChannel.send(exchange);
epb.getEndpointStatus().incrementSentRequests();
// mTableExistsFlag = new Object();
if(epb.isClustered()){
//Records already sent to NMR so update the status to "SENT" for owner table
try{
int i[] = mJDBCClusterManager.updateStatus(tempList, "SENT");
mLogger.log(Level.INFO,
"DBBC_R10906.IMP_UPDATED_STATUS_TO_SENT",
new Object[] { tempList });
}catch(Exception e){
// TODO need to handled the exception
mLogger.log(Level.SEVERE,
"DBBC_E11108.IMP_ERROR_UPDATING_STATUS_TO_SENT",
new Object[] { tempList, e.getLocalizedMessage() });
}
}
} else {
if (tx != null) {
try {
tx.commit();
} catch (Exception ex) {
mLogger.log(Level.SEVERE,
"DBBC_E00656.IMP_XA_TX_COMMIT_FAILED",
new Object[] { "commit", ex });
throw ex;
}
} else {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
mLogger.log(Level.WARNING,
"DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
new Object[] { exchange.getExchangeId() });
}
}
}
// mTableExistsFlag = new Object();
}
}
}
} catch (final Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00663.IMP_ERROR_WHILE_PROCESSING_MEP"), ex);
Transaction tx = getTransactionManager().getTransaction();
if(tx != null ) {
tx.rollback();
}
} finally {
try {
if (rs != null) {
rs.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS"), se);
}
try{
if (ps != null) {
ps.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se);
}
try{
if (connection != null) {
connection.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
try{
if(epb.isClustered() && mClusterConnection != null){
mClusterConnection.close();
mClusterConnection = null;
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
}
}
/** Checks if the Throttling configuration is defined on the endpoint,
* if yes then checks if the messages in the system are within the throttle limit
* @param
* @return boolean
*/
public boolean throttleConfigurationCheck() {
synchronized(mInboundExchanges) {
int pendingMsgs = mInboundExchanges.size();
mThrottleNumber = epb.getMaxConcurrencyLimit();
if (mThrottleNumber > 0 ) {
if(pendingMsgs > mThrottleNumber){
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00664.IMP_THROTTLE_LIMIT_REACHED",
new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) }));
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("DBBC_R00668.IMP_THROTTLE_LIMIT_REACHED",
new Object[] { Integer.toString(mThrottleNumber) }));
}
return false;
} else {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_R00665.IMP_THROTTLE_LIMIT_NOT_REACHED",
new Object[] { Integer.toString(pendingMsgs), Integer.toString(mThrottleNumber) }));
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, mMessages.getString("DBBC_R00669.IMP_THROTTLE_LIMIT_NOT_REACHED"),
new Object[] { Integer.toString(mThrottleNumber) });
}
return true;
}
}
mLogger.log(Level.INFO, mMessages.getString("DBBC_R00666.IMP_THROTTLE_NOT_DEFINED"));
return true;
}
}
public ResultSet executeInboundSQLSelect(final EndpointBean eBean,
final OperationMetaData opMetaData,
Connection connection,
final String mTableName,
String lSelectSQL) throws MessagingException {
try {
String jndiName = eBean.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
if ((mMarkColumnName == null) || (mMarkColumnName.equals(""))) {
// do nothing
} else {
if(mFlagColumnType != null){
String whereClause = " where ";
if((lSelectSQL.toUpperCase().contains(whereClause.toUpperCase()))) {
if (mFlagColumnType.equalsIgnoreCase("LONGVARCHAR") || mFlagColumnType.equalsIgnoreCase("CHAR")
|| mFlagColumnType.equalsIgnoreCase("VARCHAR")) {
lSelectSQL = lSelectSQL.concat(" and (" + mMarkColumnName + " != " + "'"
+ mMarkColumnValue + "'" + " or " + mMarkColumnName + " is NULL )");
} else {
lSelectSQL = lSelectSQL.concat(" and (" + mMarkColumnName + " != "
+ mMarkColumnValue + " or " + mMarkColumnName + " is NULL )");
}
}else {
if (mFlagColumnType.equalsIgnoreCase("LONGVARCHAR") || mFlagColumnType.equalsIgnoreCase("CHAR")
|| mFlagColumnType.equalsIgnoreCase("VARCHAR")) {
lSelectSQL = lSelectSQL.concat(" where (" + mMarkColumnName + " != " + "'"
+ mMarkColumnValue + "'" + " or " + mMarkColumnName + " is NULL )");
} else {
lSelectSQL = lSelectSQL.concat(" where (" + mMarkColumnName + " != "
+ mMarkColumnValue + " or " + mMarkColumnName + " is NULL )");
}
}
} else{
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00638.IMP_Error_IVALID_ColumnName") + mMarkColumnName;
throw new MessagingException(msg, new NamingException());
}
}
mLogger.log(Level.INFO, "Executing sql 1. " + lSelectSQL);
ps = connection.prepareStatement(lSelectSQL);
rs = ps.executeQuery();
}
catch (final SQLException ex) {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL, ex);
} else if (mLogger.isLoggable(Level.FINE)) {
mLogger.log(Level.FINE, mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL);
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL"));
}
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL +
"Reason: " + ex.getLocalizedMessage() + " SQLState: " + ex.getSQLState() + " ErrorCode: " + ex.getErrorCode();
throw new MessagingException(msg, ex);
} catch (final Exception ex) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") + lSelectSQL
+ ex.getLocalizedMessage();
throw new MessagingException(msg, ex);
}
return rs;
}
/**
* @return
* @throws MessagingException
*/
public DatabaseModel getDataAccessObject(OperationMetaData meta) throws MessagingException {
DatabaseModel objDataAccess = null;
String jndiName = null;
String prdtName = null;
String catalog = null;
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
Connection connection = null;
ResultSet rs = null;
try {
connection = getDatabaseConnection(jndiName);
prdtName = DBMetaData.getDBType(connection);
rs = connection.getMetaData().getColumns(catalog, mSchemaName, mTableName, "%");
int noofColCounter = -1;
// if(rs==null){
// final String msg = InboundMessageProcessor.mMessages.getString("IMP_Table_NotExist");
// throw new MessagingException(msg, new NamingException());
// }
while (rs.next()) {
noofColCounter++;
final String colName = rs.getString("COLUMN_NAME");
if (colName.equalsIgnoreCase(meta.getJDBCSql().getPKName())) {
final String defaultValue = rs.getString("COLUMN_DEF");
final int sqlTypeCode = rs.getInt("DATA_TYPE");
final String sqlType = DBMetaData.getSQLTypeDescription(sqlTypeCode);
mPKType = sqlType;
}
if (colName.equalsIgnoreCase(meta.getJDBCSql().getMarkColumnName())) {
final String defaultValue = rs.getString("COLUMN_DEF");
final int sqlTypeCode = rs.getInt("DATA_TYPE");
final String sqlType = DBMetaData.getSQLTypeDescription(sqlTypeCode);
mFlagColumnType = sqlType;
}
}
if(noofColCounter < 0 ){
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00636.IMP_Table_NotExist");
throw new MessagingException(msg, new NamingException());
}
if (mPKType == null) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00637.IMP_PrimaryKey_Error");
throw new MessagingException(msg, new NamingException());
}
if (prdtName.equalsIgnoreCase(InboundMessageProcessor.DERBY_PROD_NAME)) {
return objDataAccess = DerbyDataAccess.getInstance();
} else if (prdtName.equalsIgnoreCase(InboundMessageProcessor.ORACLE_PROD_NAME)) {
return objDataAccess = OracleDataAccess.getInstance();
} else if (prdtName.equalsIgnoreCase(InboundMessageProcessor.DB2_PROD_NAME)) {
return objDataAccess = Db2DataAccess.getInstance();
} else if (prdtName.equalsIgnoreCase(InboundMessageProcessor.SQLSERVER_PROD_NAME)) {
return objDataAccess = SqlServerDataAccess.getInstance();
} else if (prdtName.equalsIgnoreCase(InboundMessageProcessor.JDBC_PROD_NAME)) {
return objDataAccess = DatabaseModelImpl.getInstance();
} else {
return objDataAccess = new DatabaseModelImpl();
}
} catch (final NamingException ex) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00635.IMP_Error_Lookup") + jndiName;
mLogger.log(Level.SEVERE, msg, ex);
throw new MessagingException(msg, ex);
} catch (final SQLException ex) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL") +
"Reason: " + ex.getLocalizedMessage() + " SQLState: " + ex.getSQLState() + " ErrorCode: " + ex.getErrorCode();
mLogger.log(Level.SEVERE, msg, ex);
throw new MessagingException(msg, ex);
} catch (final Exception ex) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL");
throw new MessagingException(msg, ex);
} finally {
try {
if (rs != null) {
rs.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11109.IMP_EXCEPTION_WHILE_CLOSING_THE_RS"), se);
}
try{
if (connection != null) {
connection.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
}
}
/**
* @return
*/
public ServiceEndpoint locateServiceEndpoint() {
ServiceEndpoint activatedEndpoint = null;
final QName serviceName = (QName) epb.getValueObj(EndpointBean.FULL_SERVICE_NAME);
final String endpointName = epb.getValue(EndpointBean.ENDPOINT_NAME);
activatedEndpoint = mContext.getEndpoint(serviceName, endpointName);
if (activatedEndpoint != null) {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, "DBBC_E00645.IMP_locate_EP", new Object[] { serviceName, endpointName });
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("DBBC_E00645.IMP_locate_EP"));
}
}
return (activatedEndpoint);
}
public String Qualified(String str) {
int len = 0;
if ((str != null) && (!str.equals(""))) {
final int i = str.indexOf(".");
if (i > 0) {
len = str.length();
mTableName = str.substring(i + 1, len);
mSchemaName = str.substring(0, i);
str = "\"" + mSchemaName + "\"" + "." + "\"" + mTableName + "\"";
return str;
}
return "\"" + str + "\"";
}
return str;
}
/**
* @param exchange
* @throws Exception
*/
//@Override
public synchronized void processReplyMessage(final MessageExchange exchange) throws Exception {
String sql = null;
String jndiName = null;
Transaction tx = null;
boolean isTransacted = exchange.isTransacted();
Connection connection = null;
PreparedStatement ps = null;
if (!(exchange instanceof InOnly) && !(exchange instanceof InOut)) {
mLogger.log(Level.SEVERE, "DBBC_E00647.IMP_Unsupported_exchange_pattern",
exchange.getPattern().toString());
throw new Exception("DBBC_E00647.IMP_Unsupported_exchange_pattern");
}
final String messageId = exchange.getExchangeId();
try {
if (InboundMessageProcessor.mInboundExchanges.containsKey(messageId)) {
if (exchange.getStatus() == ExchangeStatus.DONE) {
try {
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
if (isTransacted && exchange instanceof InOnly) {
tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
try {
// we have to resume the suspended transaction
resumeThreadTx(tx);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
// failure will be logged
mLogger.log(Level.WARNING, "DBBC_E00651.IMP_RESUME_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
try {
if (tx.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
// failure will be logged
mLogger.log(Level.WARNING, "DBBC_E00652.IMP_ROLLBACK_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
}
} catch (Exception ex) {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("IMP_POST_PROCESS_FAILED"), ex);
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("IMP_POST_PROCESS_FAILED"));
}
mLogger.log(Level.SEVERE, "IMP_POST_PROCESS_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
}
connection = getDatabaseConnection(jndiName);
// if (isTransacted && exchange instanceof InOnly) {
connection.setAutoCommit(true);
// }
//final List records = epb.getProcessList();
final List records = (List)mMapInboundExchangesProcessRecords.get(messageId);
for (final Iterator it = records.iterator(); it.hasNext();) {
String pkNameRet = (String) it.next();
String pkNameValue = pkNameRet;
if (mPKType.equalsIgnoreCase("LONGVARCHAR") || mPKType.equalsIgnoreCase("CHAR")
|| mPKType.equalsIgnoreCase("VARCHAR")) {
pkNameRet = "'" + pkNameRet + "'";
}
if (mPollingPostProcessing.equalsIgnoreCase("Delete")) {
sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName,
mMarkColumnName, mMarkColumnValue, mPKName, "DELETE", mFlagColumnType);
sql = sql + "=" + pkNameRet;
mLogger.log(Level.INFO, "Executing sql 2. " + sql);
ps = connection.prepareStatement(sql);
final int delcount = ps.executeUpdate();
mLogger.log(Level.FINE, "Records deleted are:" + delcount);
try {
if (ps != null) {
ps.close();
}
} catch (final SQLException se) {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"));
}
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
}
} else if (mPollingPostProcessing.equalsIgnoreCase("MarkColumn")) {
sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName,
mMarkColumnName, mMarkColumnValue, mPKName, "UPDATE", mFlagColumnType);
sql = sql + "=" + pkNameRet;
mLogger.log(Level.INFO, "Executing sql 3. " + sql);
ps = connection.prepareStatement(sql);
final int count = ps.executeUpdate();
mLogger.log(Level.FINE, "Records updated are " + count);
try {
if (ps != null) {
ps.close();
}
} catch (final SQLException se) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
}
} else if (mPollingPostProcessing.equalsIgnoreCase("CopyRow")) {
sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName,
mMarkColumnName, mMarkColumnValue, mPKName, "INSERT", mFlagColumnType);
sql = sql + "=" + pkNameRet;
mLogger.log(Level.INFO, "Executing sql 4. " + sql);
ps = connection.prepareStatement(sql);
final int count = ps.executeUpdate();
mLogger.log(Level.FINE, "Records updated are " + count);
try {
if (ps != null) {
ps.close();
}
} catch (final SQLException se) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
}
sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName,
mMarkColumnName, mMarkColumnValue, mPKName, "UPDATE", mFlagColumnType);
sql = sql + "=" + pkNameRet;
mLogger.log(Level.INFO, "Executing sql 5. " + sql);
ps = connection.prepareStatement(sql);
final int updatecount = ps.executeUpdate();
mLogger.log(Level.FINE, "Records updated are " + updatecount);
try {
if (ps != null) {
ps.close();
}
} catch (final SQLException se) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
}
} else if (mPollingPostProcessing.equalsIgnoreCase("MoveRow")) {
sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName,
mMarkColumnName, mMarkColumnValue, mPKName, "INSERT", mFlagColumnType);
sql = sql + "=" + pkNameRet;
mLogger.log(Level.INFO, "Executing sql 6. " + sql);
ps = connection.prepareStatement(sql);
final int count = ps.executeUpdate();
mLogger.log(Level.FINE, "Records updated are " + count);
try {
if (ps != null) {
ps.close();
}
} catch (final SQLException se) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
}
sql = dbDataAccessObject.createQualifiedQuery(mTableName, mMoveRowToTableName,
mMarkColumnName, mMarkColumnValue, mPKName, "DELETE", mFlagColumnType);
sql = sql + "=" + pkNameRet;
mLogger.log(Level.INFO, "Executing sql 7. " + sql);
ps = connection.prepareStatement(sql);
final int delcount = ps.executeUpdate();
mLogger.log(Level.FINE, "Records deleted are:" + delcount);
try {
if (ps != null) {
ps.close();
}
} catch (final SQLException se) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00628.OMP_Cleanup_Failure"), se);
}
} // else if
mProcessedList.remove(pkNameValue);
}
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to commit
commitThreadTx(exchange);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
// failure will be logged
mLogger.log(Level.SEVERE, "DBBC_E00657.IMP_COMMIT_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
}
} catch (final SQLException ex) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL")
+ sql;
mLogger.log(Level.SEVERE, msg, new Object[] {ex.getLocalizedMessage()});
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception exception) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception);
}
}
} catch (final Exception ex) {
final String msg = InboundMessageProcessor.mMessages.getString("DBBC_E00639.IMP_Failed_Executing_SQL");
mLogger.log(Level.SEVERE, msg, new Object[] {ex.getLocalizedMessage()});
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception exception) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception);
}
}
}
// for cluster environment
if(epb.isClustered()){
try{
Connection con = null;
List records = (List)mMapInboundExchangesProcessRecords.get(messageId);
if(jndiName.equalsIgnoreCase(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME))){
con = connection;
}else{
con = getDatabaseConnection(mRuntimeConfig.getProperties().getProperty(RuntimeConfiguration.CONFIG_CLUSTER_DATABASE_JNDINAME));
}
mJDBCClusterManager.updateStatusToDone(records, "DONE", con);
mLogger.log(Level.INFO,
"DBBC_R10906.IMP_UPDATED_STATUS_TO_DONE",
new Object[] { records });
}catch(Exception e){
mLogger.log(Level.SEVERE, "Unable to set the status to DOne", e.getLocalizedMessage());
}finally {
try{
if(con != null){
con.close();
con = null;
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, "Unable to close the connection", se.getLocalizedMessage());
}
}
}
} else {
final List records = (List)mMapInboundExchangesProcessRecords.get(messageId);
for (final Iterator it = records.iterator(); it.hasNext();) {
String pkNameRet = (String) it.next();
mProcessedList.remove(pkNameRet);
}
mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(),
messageId });
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), ex);
}
} else {
// Any status other than 'DONE' is considered an error
final String msgError = "Error occured while getting DONE Response ";
throw new Exception(msgError);
}
}
if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, "DBBC_E00648.IMP_Remove_exchange_msg_id", messageId);
}
} else {
mLogger.log(Level.SEVERE, "DBBC_E00646.IMP_Invalid_reply_msgId", messageId);
}
} finally {
InboundMessageProcessor.mInboundExchanges.remove(messageId);
mMapInboundExchangesProcessRecords.remove(messageId);
try {
if(ps != null) {
ps.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11110.IMP_EXCEPTION_WHILE_CLOSING_THE_PS"), se);
}
try{
if(connection != null) {
connection.close();
}
}catch(SQLException se){
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E11111.IMP_EXCEPTION_WHILE_CLOSING_THE_CONNECTION"), se);
}
}
}
// suspend thread transactional context
private void resumeThreadTx(Transaction tx) throws Exception {
if (tx != null) {
((TransactionManager) mContext.getTransactionManager()).resume(tx);
if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, " resuing txn ");
}
if (mLogger.isLoggable(Level.FINER)) {
mLogger.log(Level.FINER, " resuing txn ", new Object[] { tx.toString() });
}
}
}
private void rollbackThreadTx(MessageExchange msgXChange) throws Exception {
if (msgXChange.isTransacted()) {
Transaction tx = (Transaction) msgXChange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null) {
try {
tx.rollback();
} catch (Exception ex) {
mLogger.log(Level.SEVERE, "DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED", new Object[]{ex.getLocalizedMessage()});
throw ex;
}
} else {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
mLogger.log(Level.WARNING, "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
new Object[]{msgXChange.getExchangeId()});
}
}
}
}
private void commitThreadTx(MessageExchange msgXChange) throws Exception {
if (msgXChange.isTransacted()) {
Transaction tx = (Transaction) msgXChange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null) {
try {
tx.commit();
} catch (Exception ex) {
mLogger.log(Level.SEVERE, "DBBC_E00656.IMP_XA_TX_COMMIT_FAILED", new Object[]{"commit", ex});
throw ex;
}
}
} else {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
mLogger.log(Level.WARNING, "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
new Object[]{msgXChange.getExchangeId()});
}
}
}
/**
* @param jndiName
* @return
* @throws javax.naming.NamingException
*/
private Object getDataSourceFromContext(final String jndiName) throws javax.naming.NamingException {
final Context c = mContext.getNamingContext();
return c.lookup(jndiName);
}
/**
* @param jndiName
* @return
* @throws Exception
*/
private Connection getDatabaseConnection(final String jndiName) throws SQLException, NamingException {
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
return ((DataSource) getDataSourceFromContext(jndiName)).getConnection();
}
/**
* @param prepStmtSQLText
* @return
*/
private boolean isSelectStatement(final String prepStmtSQLText) {
prepStmtSQLText.trim();
final StringTokenizer tok = new StringTokenizer(prepStmtSQLText);
if (tok.hasMoreTokens()) {
final String firstTok = (String) tok.nextToken();
if (firstTok.equalsIgnoreCase("select")) {
return true;
}
}
return false;
}
protected void stopReceiving() {
mLogger.log(Level.INFO, "DBBC_R00644.IMP_Inbound_stopped");
mMonitor.set(Boolean.TRUE);
}
private TransactionManager getTransactionManager() {
return (TransactionManager)mContext.getTransactionManager();
}
public void setMessageExchangeId(String messageExchangeId, Object retryMetaData) {
exchangeIDToMeta.put(messageExchangeId, retryMetaData);
}
public void onRedelivery(MessageExchange exchange) throws MessagingException {
NormalizedMessage inMsg;
EndpointBean operationMetaData = (EndpointBean) exchangeIDToMeta.remove(exchange.getExchangeId());
String groupId = (String)exchange.getProperty(CRMP_GROUP_ID);
String messageId = (String)exchange.getProperty(CRMP_MESSAGE_ID);
// remove the listener associated with the exchange ID
MessageExchangeSupport.removeRedeliveryListener(exchange.getExchangeId());
mInboundExchanges.remove(exchange.getExchangeId());
try{
switch (ExchangePattern.valueOf(exchange)) {
case IN_OUT:
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, "Resending the InOut exchange with group ID '"
+ groupId + "' and message ID '" + messageId + "'...");
} else if (mLogger.isLoggable(Level.FINE)) {
mLogger.log(Level.FINE, "Resending the InOut exchange with message ID '" + messageId + "'...");
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, "Resending the InOut exchange");
}
inMsg = ((InOut)exchange).getInMessage();
InOut inout = mMsgExchangeFactory.createInOutExchange();
// make sure that the message id has is the same
inout.setProperty(CRMP_GROUP_ID, groupId);
inout.setProperty(CRMP_MESSAGE_ID, messageId);
//processInOut(inout, inMsg, operationMetaData);
processInOut(inout,operationMetaData);
break;
case IN_ONLY:
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, "Resending the InOnly exchange with group ID '"
+ groupId + "' and message ID '" + messageId + "'...");
} else if (mLogger.isLoggable(Level.FINE)) {
mLogger.log(Level.FINE, "Resending the InOnly exchange with message ID '" + messageId + "'...");
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, "Resending the InOnly exchange");
}
inMsg = ((InOnly)exchange).getInMessage();
InOnly inonly = mMsgExchangeFactory.createInOnlyExchange();
// make sure that the message id has is the same
inonly.setProperty(CRMP_GROUP_ID, groupId);
inonly.setProperty(CRMP_MESSAGE_ID, messageId);
//processInOnly(inonly, inMsg, operationMetaData);
if (mServiceEndpoint == null) {
mServiceEndpoint = locateServiceEndpoint();
epb.setValueObj(EndpointBean.ENDPOINT_REFERENCE, mServiceEndpoint);
}
if (mServiceEndpoint == null) {
throw new MessagingException(mMessages.getString("DBBC_E00643.IMP_Failed_locate_EP"));
}
inonly.setEndpoint(mServiceEndpoint);
inonly.setOperation(mOperation);
MessageExchangeSupport.addReplyListener(inonly.getExchangeId(), replyListener, epb);
MessageExchangeSupport.addRedeliveryListener(inonly.getExchangeId(), this, epb);
List records = (List)mMapInboundExchangesProcessRecords.get(exchange.getExchangeId());
for (final Iterator it = records.iterator(); it.hasNext();) {
String pkNameRet = (String) it.next();
mProcessedList.remove(pkNameRet);
}
// Removing the records from the Map
mMapInboundExchangesProcessRecords.remove(exchange.getExchangeId());
processInOnly(inonly,operationMetaData);
break;
default:
if (mLogger.isLoggable(Level.FINE)) {
mLogger.log(Level.FINE, "Retry handler receives an unsupported exchange pattern: "
+ExchangePattern.valueOf(exchange) + ". Ignoring the retry attempt...");
}
break;
}
}catch(Exception e){
mLogger.log(Level.SEVERE, "Failed in retry handler",
e.getMessage());
throw new MessagingException(e);
}
}
/*
* Runtime Config object to cluster JNDI name
*
*/
public void setRuntimeConfig(RuntimeConfiguration runtimeConfg){
mRuntimeConfig = runtimeConfg;
}
}