Monkey-patch database BC to remove the rest of XA

noxa
Vitaliy Filippov 2015-12-21 18:03:38 +03:00
parent 560910d2a1
commit 49fdfb78d1
2 changed files with 11 additions and 627 deletions

View File

@ -60,11 +60,6 @@ import javax.jbi.servicedesc.ServiceEndpoint;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.sql.XAConnection;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@ -129,8 +124,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
private String mSchemaName = null;
private String mXAEnabled = null;
private String mTableName = null;
private String mPollingPostProcessing = null;
@ -332,14 +325,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mTableName = meta.getJDBCSql().getTableName();
mPollingPostProcessing = meta.getJDBCSql().getPollingPostProcessing();
mMoveRowToTableName = meta.getJDBCSql().getMoveRowToTableName();
mXAEnabled = meta.getJDBCSql().getTransaction();
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
// Throttle Check
if(throttleConfigurationCheck()) {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
getTransactionManager().begin();
}
epb.setTableName(mTableName);
@ -699,10 +688,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
mMonitor.set(Boolean.TRUE);
}
private TransactionManager getTransactionManager() {
return (TransactionManager)mContext.getTransactionManager();
}
public void setMessageExchangeId(String messageExchangeId, Object retryMetaData) {
exchangeIDToMeta.put(messageExchangeId, retryMetaData);
}

View File

@ -55,10 +55,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import com.sun.jbi.internationalization.Messages;
import javax.transaction.xa.XAResource;
import javax.sql.*;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import com.sun.jbi.common.qos.messaging.MessagingChannel;
import com.sun.jbi.common.descriptor.EndpointInfo;
@ -86,14 +83,11 @@ public class OutboundMessageProcessor implements Runnable {
private MessageExchange mExchange;
private JDBCComponentContext mContext;
private Map mInboundExchanges;
XAConnection xaConnection = null;
private String mXAEnabled = null;
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
CallableStatement cs = null;
private boolean mtxFlag;
XAResource xaResource = null;
// Settings for custom reliability header extensions
public static final String CUSTOM_RELIABILITY_MESSAGE_ID_PROPERTY =
"com.stc.jbi.messaging.messageid"; // NOI18N
@ -248,23 +242,13 @@ public class OutboundMessageProcessor implements Runnable {
else if (mLogger.isLoggable(Level.FINE))
mLogger.log(Level.FINE, "DBBC_R00612.OMP_Recv_InOut",
"");
if (mExchange.isTransacted()) {
// Start of nested diagnostic context prior to processing of message
Logger.getLogger("com.sun.EnterContext").fine(
"context");
processInOutXA((InOut) mExchange, epb);
// End of nested diagnostic context prior to processing of message
Logger.getLogger("com.sun.ExitContext").fine(
"context");
} else {
// Start of nested diagnostic context prior to processing of message
Logger.getLogger("com.sun.EnterContext").fine(
"context");
processInOut((InOut) mExchange, epb);
// End of nested diagnostic context prior to processing of message
Logger.getLogger("com.sun.ExitContext").fine(
"context");
}
// Start of nested diagnostic context prior to processing of message
Logger.getLogger("com.sun.EnterContext").fine(
"context");
processInOut((InOut) mExchange, epb);
// End of nested diagnostic context prior to processing of message
Logger.getLogger("com.sun.ExitContext").fine(
"context");
break;
case IN_ONLY:
if (mLogger.isLoggable(Level.FINE))
@ -861,544 +845,12 @@ public class OutboundMessageProcessor implements Runnable {
}
}
private void processInOutXA(final InOut inout, final EndpointBean epb) {
mLogger.log(Level.INFO, "Entering processInOutXA");
XAConnection xaconnection = null;
Transaction transaction = null;
String faultCode = null;
String faultDetail = null;
boolean success = true;
if (inout.getStatus() == ExchangeStatus.DONE) {
// remove the redelivery listener handler - no retry needed.
MessageExchangeSupport.removeReplyListener(inout.getExchangeId());
updateTallyReceives(epb, true);
} else if (inout.getStatus() == ExchangeStatus.ERROR) {
// added for retry support
updateTallyReceives(epb, false);
// send alerts
String errorMsg = inout.getError().getMessage();
if (errorMsg != null) {
String msg = mMessages.getString(
"DBBC-E00720.Message_exchange_error",
new Object[]{
String.valueOf(inout.getService()),
inout.getEndpoint().getEndpointName(),
errorMsg
});
mLogger.log(Level.SEVERE, msg);
AlertsUtil.getAlerter().warning(msg,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
epb.getDeploymentId(),
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC-E00720");
} else {
String msg = mMessages.getString(
"DBBC-E00721.Message_exchange_error_no_detail",
new Object[]{
String.valueOf(inout.getService()),
inout.getEndpoint().getEndpointName()
});
mLogger.log(Level.SEVERE, msg);
AlertsUtil.getAlerter().warning(msg,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
epb.getDeploymentId(),
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC-E00721");
}
/**
// let's see if retry is configured or not
EndpointInfo info = new EndpointInfo(false,
epb.getEndpointName(),
null,
epb.getServiceName(),
null);
RedeliveryConfig retryConfig = mChannel.getServiceQuality(info, RedeliveryConfig.class);
**/
RedeliveryStatus retryStatus = Redelivery.getRedeliveryStatus(inout);
if (retryStatus != null && retryStatus.getRemainingRetries() > 0)
try {
MessageExchangeSupport.notifyOfRedelivery(inout);
} catch (Exception e) {
String groupId = (String) inout.getProperty(CRMP_GROUP_ID);
String messageId = (String) inout.getProperty(
CRMP_MESSAGE_ID);
if (mLogger.isLoggable(Level.WARNING)) {
String text = mMessages.getString(
"DBBC-E01036.Failed_to_process_redelivery",
new Object[]{groupId, messageId});
mLogger.log(Level.WARNING, text, e);
AlertsUtil.getAlerter().warning(text,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
epb.getDeploymentId(),
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC-E01036");
}
}
//epb.getEndpointStatus().incrementReceivedErrors();
} else {
// added for retry support
try {
MessageExchangeSupport.notifyOfReply(inout);
} catch (Exception ex) {
if (mLogger.isLoggable(Level.WARNING)) {
String text = mMessages.getString(
"DBBC-E00759.Exception_during_reply_processing", ex.
getLocalizedMessage());
mLogger.log(Level.SEVERE, text, ex);
AlertsUtil.getAlerter().warning(text,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
epb.getDeploymentId(),
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC-E00759");
}
success = false;
}
try {
updateTallyReceivedRequests(epb);
Map operationNameToMetaData = (Map) epb.getValueObj(
EndpointBean.OPERATION_NAME_TO_META_DATA);
OperationMetaData meta = (OperationMetaData) operationNameToMetaData.get(inout.getOperation().
getLocalPart());
if (meta == null)
throw new MessagingException(mMessages.getString(
"DBBC_E00621.OMP_oper_NotDefined") + inout.getOperation());
final NormalizedMessage inMsg = inout.getInMessage();
NormalizedMessage outMsg = mExchange.createMessage();
String statusMessage = "";
String jndiName = null;
try {
Object[] jndiConn = getDatabaseConnection(inMsg, inout.getExchangeId(), epb);
jndiName = (String)jndiConn[0];
connection = (Connection)jndiConn[1];
rs = null;
int rowsUpdated = -1;
String generatedKeyValue = "";
String outputValue = "";
// writeMessage(inMsg, destinationAddress, false);
JDBCOperationInput input = meta.getJDBCSql();
if (input != null) {
final String sql = input.getSql();
if (inout.isTransacted())
// Removing manual enlistment. Moving to automatic resource enlistment
transaction =
(Transaction) inout.getProperty(
MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (transaction != null)
resumeThreadTx(transaction);
/* PP: Glassfish does not return a XADataSource and always returns
* a DataSource30 object which does not implement getXAResource() method
xaconnection = getXADatabaseConnection(epb);
XAResource xaresource = xaconnection.getXAResource();
transaction.enlistResource(xaresource);
connection = xaconnection.getConnection();*/
if (meta.getJDBCOperationInput().getOperationType().
equalsIgnoreCase(JDBCOperations.OPERATION_TYPE_SELECT.toString()) ||
meta.getJDBCOperationInput().getOperationType().
equalsIgnoreCase(JDBCOperations.OPERATION_TYPE_FIND.toString())) {
try {
rs = executeOutboundSQLSelect(inMsg, epb, meta, jndiName, connection);
} catch (final SQLException ex) {
faultCode = SERVER;
faultDetail = mMessages.getString(
"DBBC_E00626.OMP_Failed_Exec_SQL");
String faultString = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString(), ex.
getLocalizedMessage()
});
processException(ex, transaction, inout, epb,
faultCode, faultDetail);
AlertsUtil.getAlerter().warning(faultDetail,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
null,
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC_E00626");
throw new Exception(faultString, ex);
} catch (final Exception ex) {
faultCode = CLIENT;
faultDetail = mMessages.getString(
"DBBC_E00626.OMP_Failed_Exec_SQL");
String faultString = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString()
});
processException(ex, transaction, inout, epb,
faultCode, faultDetail);
throw new Exception(faultString, ex);
}
if (rs != null)
statusMessage = "Success : ResultSet returned ";
final JDBCNormalizer normalizer =
new JDBCNormalizer();
Probe normalizationMeasurement = null;
try {
normalizationMeasurement =
Probe.info(getClass(),
epb.getUniqueName(),
JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
outMsg = normalizer.normalizeSelect(rs, inout,
meta, connection.getMetaData().
getDriverName());
} catch (Exception e) {
faultCode = SERVER;
faultDetail =
"Unable to normalize response from the external service.";
String faultString = mMessages.getString(
"DBBC_E00702.JDBCN_Failed_NM", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString(), e.getLocalizedMessage()
});
processException(e, transaction, inout, epb,
faultCode, faultDetail);
AlertsUtil.getAlerter().warning(faultString,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
null,
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC_E00702");
throw new Exception(faultString, e);
} finally {
if (normalizationMeasurement != null)
normalizationMeasurement.end();
}
inout.setOutMessage(outMsg);
} else {
if (meta.getJDBCOperationInput().getOperationType().
equalsIgnoreCase(JDBCOperations.OPERATION_TYPE_EXECUTE.
toString())) {
try {
cs = executeOutboundProc(inMsg, epb, meta, jndiName, connection);
} catch (final SQLException ex) {
faultCode = SERVER;
faultDetail = mMessages.getString(
"DBBC_E00626.OMP_Failed_Exec_SQL");
String faultString = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString(), ex.
getLocalizedMessage()
});
processException(ex, transaction, inout, epb,
faultCode, faultDetail);
AlertsUtil.getAlerter().warning(faultString,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
null,
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC_E00626");
throw new Exception(faultString, ex);
} catch (final Exception ex) {
faultCode = CLIENT;
faultDetail = mMessages.getString(
"DBBC_E00626.OMP_Failed_Exec_SQL");
String faultString = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString()
});
processException(ex, transaction, inout, epb,
faultCode, faultDetail);
throw new Exception(faultString, ex);
}
final JDBCNormalizer normalizer =
new JDBCNormalizer();
Probe normalizationMeasurement = null;
try {
normalizationMeasurement =
Probe.info(getClass(),
epb.getUniqueName(),
JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
outMsg = normalizer.normalizeProcedure(cs,
inout, meta, connection.getMetaData().
getDatabaseProductName());
} catch (Exception e) {
faultCode = SERVER;
faultDetail =
"Unable to normalize response from the external service.";
String faultString = mMessages.getString(
"DBBC_E00702.JDBCN_Failed_NM", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString()
});
processException(e, transaction, inout, epb,
faultCode, faultDetail);
AlertsUtil.getAlerter().warning(faultString,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
null,
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC_E00702");
throw new Exception(faultString, e);
} finally {
if (normalizationMeasurement != null)
normalizationMeasurement.end();
}
} else {
try {
/*
* Modified by Logicoy for [ task #20 ] DB BC - Insert statement should return primary key auto-increment value inserted
*/
String generatedKey = meta.getJDBCSql().
getGeneratedKey();
if (meta.getJDBCOperationInput().getOperationType().
equalsIgnoreCase(JDBCOperations.OPERATION_TYPE_INSERT.toString()) &&
generatedKey != null && !"".equals(generatedKey)) {
generatedKeyValue = executeOutboundSQLWithGeneratedKeys(inMsg, epb, meta, jndiName, connection);
outputValue = generatedKeyValue;
statusMessage =
"Success : Generated Key = " + generatedKeyValue;
} else {
rowsUpdated = executeOutboundSQL(inMsg, epb, meta, jndiName, connection);
statusMessage =
"Success : " + rowsUpdated + " are updated .";
outputValue =
String.valueOf(rowsUpdated);
}
} catch (final SQLException ex) {
faultCode = SERVER;
faultDetail = mMessages.getString(
"DBBC_E00626.OMP_Failed_Exec_SQL");
String faultString = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString(), ex.
getLocalizedMessage()
});
processException(ex, transaction, inout, epb,
faultCode, faultDetail);
AlertsUtil.getAlerter().warning(faultString,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
null,
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC_E00706");
} catch (final Exception ex) {
faultCode = CLIENT;
faultDetail = mMessages.getString(
"DBBC_E00626.OMP_Failed_Exec_SQL");
String faultString = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString()
});
processException(ex, transaction, inout, epb,
faultCode, faultDetail);
throw new Exception(faultString, ex);
}
final JDBCNormalizer normalizer =
new JDBCNormalizer();
Probe normalizationMeasurement = null;
try {
/*
* Modified by Logicoy for [ task #20 ] DB BC - Insert statement should return primary key auto-increment value inserted
*/
outMsg = normalizer.normalize(outputValue,
inout, meta);
} catch (Exception e) {
faultCode = SERVER;
faultDetail =
"Unable to normalize response from the external service.";
String faultString = mMessages.getString(
"DBBC_E00702.JDBCN_Failed_NM", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
inout.getOperation().toString()
});
AlertsUtil.getAlerter().warning(faultString,
JDBCBindingLifeCycle.SHORT_DISPLAY_NAME,
null,
AlertsUtil.getServerType(),
AlertsUtil.COMPONENT_TYPE_BINDING,
NotificationEvent.OPERATIONAL_STATE_RUNNING,
NotificationEvent.EVENT_TYPE_ALERT,
"DBBC_E00702");
processException(e, transaction, inout, epb,
faultCode, faultDetail);
throw new Exception(faultString, e);
} finally {
if (normalizationMeasurement != null)
normalizationMeasurement.end();
}
}
inout.setOutMessage(outMsg);
}
} else {
SPOperationInput spInput =
meta.getJDBCSPOperationInput();
if (spInput != null) {
if (inout.isTransacted())
// Removing manual enlistment. Moving to automatic resource enlistment
transaction =
(Transaction) inout.getProperty(
MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (transaction != null)
resumeThreadTx(transaction);
cs = executeOutboundProc(inMsg, epb, meta, jndiName, connection);
final JDBCNormalizer normalizer =
new JDBCNormalizer();
final JDBCDenormalizer denormalizer =
new JDBCDenormalizer();
if (connection.getMetaData().getDatabaseProductName().
toLowerCase().contains("sql server") || connection.
getMetaData().getDatabaseProductName().
toLowerCase().contains("adaptive server"))
if (outParamIndex.size() == 0) {
outParamNames.put(1, denormalizer.getProcName(meta.
getJDBCSPOperationInput().
getExecutionString()));
outParamTypes.put(1, "RESULTSET");
outParamIndex.add(
Integer.valueOf(Double.valueOf(1).intValue()));
}
normalizer.setOutParamIndex(outParamIndex);
normalizer.setOutParamNames(outParamNames);
normalizer.setOutParamTypes(outParamTypes);
Probe normalizationMeasurement = null;
try {
normalizationMeasurement =
Probe.info(getClass(),
epb.getUniqueName(),
JDBCBindingLifeCycle.PERF_CAT_NORMALIZATION);
outMsg = normalizer.normalizeProcedure(cs, inout, meta, connection.getMetaData().
getDatabaseProductName().toLowerCase());
} catch (Exception e) {
faultCode = SERVER;
faultDetail =
"Unable to normalize response from the external service.";
String faultString =
mMessages.getString(
"DBBC_E00702.JDBCN_Failed_NM", new Object[]{
inout.getExchangeId(), epb.getValue(
EndpointBean.ENDPOINT_NAME),
mExchange.getOperation().toString()});
processException(e, transaction, inout, epb,
faultCode, faultDetail);
throw new Exception(faultString, e);
} finally {
if (normalizationMeasurement != null)
normalizationMeasurement.end();
}
inout.setOutMessage(outMsg);
}
}
} catch (final Exception ex) {
mLogger.log(Level.SEVERE,
OutboundMessageProcessor.mMessages.getString(
"DBBC_E00622.OMP_Failed_writing"), ex.getLocalizedMessage());
success = false;
setErrorInExchange(inout, faultCode, faultDetail, ex);
inout.setStatus(ExchangeStatus.ERROR);
}
} catch (final Exception ex) {
mLogger.log(Level.SEVERE,
OutboundMessageProcessor.mMessages.getString(
"DBBC_E00623.OMP_Failed_inout"), ex);
throw new RuntimeException(ex);
} finally {
try {
getTransactionManager().suspend();
mChannel.send(inout);
if (success)
updateTallySentReplies(epb);
else
epb.getEndpointStatus().incrementSentErrors();
if (rs != null)
rs.close();
if (cs != null)
cs.close();
if (ps != null)
ps.close();
if (connection != null)
connection.close();
} catch (SQLException sqlexception) {
if (mLogger.isLoggable(Level.FINE))
mLogger.log(Level.SEVERE, mMessages.getString(
"DBBC_E00628.OMP_Cleanup_Failure"), sqlexception);
else if (mLogger.isLoggable(Level.INFO))
mLogger.log(Level.SEVERE, mMessages.getString(
"DBBC_E00628.OMP_Cleanup_Failure"));
} catch (Exception ex) {
if (mLogger.isLoggable(Level.FINE))
mLogger.log(Level.SEVERE, mMessages.getString(
"DBBC_E00628.OMP_Cleanup_Failure"), ex);
else if (mLogger.isLoggable(Level.INFO))
mLogger.log(Level.SEVERE, mMessages.getString(
"DBBC_E00628.OMP_Cleanup_Failure"));
}
}
}
}
/**
* @param inonly
* @param epb
*/
protected void processInOnly(final InOnly inonly, final EndpointBean epb) {
Connection connection = null;
Transaction transaction = null;
boolean success = true;
String faultCode = null;
String faultDetail = null;
@ -1519,14 +971,7 @@ public class OutboundMessageProcessor implements Runnable {
getOperation());
if (inonly.isTransacted())
// Removing manual enlistment. Moving to automatic resource enlistment
transaction = (Transaction) inonly.getProperty(
MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
try {
if (transaction != null)
getTransactionManager().resume(transaction);
Object[] jndiConn = getDatabaseConnection(inMsg, inonly.getExchangeId(), epb);
String jndiName = (String)jndiConn[0];
connection = (Connection)jndiConn[1];
@ -1535,20 +980,18 @@ public class OutboundMessageProcessor implements Runnable {
if (meta.getJDBCOperationInput().getOperationType().
equalsIgnoreCase(
JDBCOperations.OPERATION_TYPE_EXECUTE.toString()))
// Auto enlistment: pass the transaction retrieved from the Message exchange.
cs = executeOutboundProc(inMsg, epb, meta, jndiName, connection);
else
// Auto enlistment: pass the transaction retrieved from the Message exchange.
executeOutboundSQL(inMsg, epb, meta, jndiName, connection);
inonly.setStatus(ExchangeStatus.DONE);
} catch (final SQLException ex) {
processException(ex, transaction, inonly, epb, faultCode,
processException(ex, inonly, epb, faultCode,
faultDetail);
} catch (final MessagingException ex) {
processException(ex, transaction, inonly, epb, faultCode,
processException(ex, inonly, epb, faultCode,
faultDetail);
} catch (final Exception ex) {
processException(ex, transaction, inonly, epb, faultCode,
processException(ex, inonly, epb, faultCode,
faultDetail);
} finally {
try {
@ -1569,8 +1012,6 @@ public class OutboundMessageProcessor implements Runnable {
}
}
if (transaction != null)
getTransactionManager().suspend();
mChannel.send(inonly);
@ -1819,15 +1260,6 @@ public class OutboundMessageProcessor implements Runnable {
// return rowsUpdated;
}
// resumes the transaction
private void resumeThreadTx(Transaction tx) throws Exception {
if (tx != null) {
((TransactionManager) mContext.getContext().getTransactionManager()).
resume(tx);
mLogger.log(Level.INFO, " ", new Object[]{tx.toString()});
}
}
/**
* @param jndiName
* @return
@ -1926,46 +1358,13 @@ public class OutboundMessageProcessor implements Runnable {
return new Object[] { jndiName, connection };
}
private XAConnection getXADatabaseConnection(final EndpointBean epbean) throws Exception {
try {
final DataSource ds = (DataSource) getDataSourceFromContext(epbean.
getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME));
if (ds instanceof XADataSource) {
XAConnection con = ((XADataSource) ds).getXAConnection();
return con;
}
} catch (Exception e) {
if (mLogger.isLoggable(Level.FINEST))
mLogger.log(Level.SEVERE, "Either the JNDI NAME is NULL or Could not establish connection using JNDI NAME :" +
EndpointBean.JDBC_DATABASE_JNDI_NAME, e);
else if (mLogger.isLoggable(Level.FINE))
mLogger.log(Level.SEVERE, "Either the JNDI NAME is NULL or Could not establish connection using JNDI NAME :" +
EndpointBean.JDBC_DATABASE_JNDI_NAME + " Reason: " + e.
getLocalizedMessage());
else if (mLogger.isLoggable(Level.INFO))
mLogger.log(Level.SEVERE,
"Either the JNDI NAME is NULL or Could not establish connection using JNDI NAME :");
}
return null;
}
private TransactionManager getTransactionManager() {
return (TransactionManager) mContext.getContext().getTransactionManager();
}
private boolean processException(final Exception ex, Transaction transaction,
private boolean processException(final Exception ex,
final MessageExchange inonly, final EndpointBean epb,
String faultCode, String faultDetail) {
boolean success = false;
mLogger.log(Level.WARNING, OutboundMessageProcessor.mMessages.getString(
"DBBC_E00622.OMP_Failed_writing"), ex);
try {
if (transaction != null)
transaction.setRollbackOnly();
} catch (javax.transaction.SystemException e) {
//ignore since the code below will take care of setting error on ME.
}
faultCode = CLIENT;
faultDetail = mMessages.getString(
"DBBC_E00706.JDBCDN_Failed_Denormalize");