Remove transaction support from InboundMessageProcessor

master
Vitaliy Filippov 2015-12-18 00:24:02 +03:00
parent 89ab64499a
commit 4e8e052920
1 changed files with 7 additions and 151 deletions

View File

@ -398,7 +398,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
}
}
}
Transaction tx = getTransactionManager().getTransaction();
if (isSelectStatement(mSelectSQL)) {
if(epb.isClustered()){
mJDBCClusterManager.setDataBaseConnection(mClusterConnection);
@ -438,11 +437,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
inMsg.setProperty(JDBCComponentContext.NM_PROP_DATABASEBC_CONNECTION_JNDI_NAME, jndiName);
exchange.setMessage(inMsg, "in");
if (tx != null) {
mExchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, tx);
getTransactionManager().suspend();
}
mInboundExchanges.put(exchangeId, new ListenerMeta(
System.currentTimeMillis(), this));
@ -465,22 +459,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
} else {
if (epb.isClustered())
mClusterConnection.setAutoCommit(true);
if (tx != null) {
try {
tx.commit();
} catch (Exception ex) {
mLogger.log(Level.SEVERE,
"DBBC_E00656.IMP_XA_TX_COMMIT_FAILED",
new Object[] { "commit", ex });
throw ex;
}
} else {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
mLogger.log(Level.WARNING,
"DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
new Object[] { exchange.getExchangeId() });
}
}
}
}
else if (epb.isClustered())
@ -489,10 +467,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
}
} catch (final Exception ex) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00663.IMP_ERROR_WHILE_PROCESSING_MEP"), ex);
Transaction tx = getTransactionManager().getTransaction();
if (tx != null) {
tx.rollback();
}
} finally {
try {
if (rs != null) {
@ -810,8 +784,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
//@Override
public synchronized void processReplyMessage(final MessageExchange exchange) throws Exception {
String jndiName = null;
Transaction tx = null;
boolean isTransacted = exchange.isTransacted();
Connection connection = null;
if (!(exchange instanceof InOnly) && !(exchange instanceof InOut)) {
@ -824,65 +796,10 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
try {
if (InboundMessageProcessor.mInboundExchanges.containsKey(messageId)) {
if (exchange.getStatus() == ExchangeStatus.DONE) {
try {
if (isTransacted && exchange instanceof InOnly) {
tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
try {
// we have to resume the suspended transaction
resumeThreadTx(tx);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
// failure will be logged
mLogger.log(Level.WARNING, "DBBC_E00651.IMP_RESUME_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
try {
if (tx.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
// failure will be logged
mLogger.log(Level.WARNING, "DBBC_E00652.IMP_ROLLBACK_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
}
} catch (Exception ex) {
if (mLogger.isLoggable(Level.FINEST)) {
mLogger.log(Level.FINEST, mMessages.getString("IMP_POST_PROCESS_FAILED"), ex);
} else if (mLogger.isLoggable(Level.INFO)) {
mLogger.info(mMessages.getString("IMP_POST_PROCESS_FAILED"));
}
mLogger.log(Level.SEVERE, "IMP_POST_PROCESS_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
}
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
connection = getDatabaseConnection(jndiName);
doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId));
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to commit
commitThreadTx(exchange);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
// failure will be logged
mLogger.log(Level.SEVERE, "DBBC_E00657.IMP_COMMIT_FAILED",
new Object[] { ex.getLocalizedMessage() });
}
}
} catch (final Exception ex) {
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception exception) {
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), exception);
}
}
}
jndiName = epb.getValue(EndpointBean.JDBC_DATABASE_JNDI_NAME);
mLogger.log(Level.INFO, InboundMessageProcessor.mMessages.getString("DBBC_R00629.OMP_UsedJNDI") + jndiName);
connection = getDatabaseConnection(jndiName);
doPostProcessing(connection, (List)mMapInboundExchangesProcessRecords.get(messageId));
// for cluster environment
if(epb.isClustered()){
Connection con = null;
@ -912,19 +829,9 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
}
} else {
mLogger.log(Level.SEVERE, "IMP_MXCH_BAD_STATUS", new Object[] { exchange.getStatus().toString(), messageId });
if (isTransacted && exchange instanceof InOnly) {
try {
// As we are the initiator for tx we have to rollback
rollbackThreadTx(exchange);
} catch (Exception ex) {
// for in-only there's no sending of status back to nmr
mLogger.log(Level.SEVERE, mMessages.getString("DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED"), ex);
}
} else {
// Any status other than 'DONE' is considered an error
final String msgError = "Error occured while getting DONE Response ";
throw new Exception(msgError);
}
// Any status other than 'DONE' is considered an error
final String msgError = "Error occured while getting DONE Response ";
throw new Exception(msgError);
}
final List records = (List)mMapInboundExchangesProcessRecords.get(messageId);
for (final Iterator it = records.iterator(); it.hasNext();) {
@ -950,57 +857,6 @@ public class InboundMessageProcessor implements Runnable, MessageExchangeReplyLi
}
}
// suspend thread transactional context
private void resumeThreadTx(Transaction tx) throws Exception {
if (tx != null) {
((TransactionManager) mContext.getTransactionManager()).resume(tx);
if (mLogger.isLoggable(Level.INFO)) {
mLogger.log(Level.INFO, " resuing txn ");
}
if (mLogger.isLoggable(Level.FINER)) {
mLogger.log(Level.FINER, " resuing txn ", new Object[] { tx.toString() });
}
}
}
private void rollbackThreadTx(MessageExchange msgXChange) throws Exception {
if (msgXChange.isTransacted()) {
Transaction tx = (Transaction) msgXChange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null) {
try {
tx.rollback();
} catch (Exception ex) {
mLogger.log(Level.SEVERE, "DBBC_E00653.IMP_XA_TX_ROLLBACK_FAILED", new Object[]{ex.getLocalizedMessage()});
throw ex;
}
} else {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
mLogger.log(Level.WARNING, "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
new Object[]{msgXChange.getExchangeId()});
}
}
}
}
private void commitThreadTx(MessageExchange msgXChange) throws Exception {
if (msgXChange.isTransacted()) {
Transaction tx = (Transaction) msgXChange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null) {
try {
tx.commit();
} catch (Exception ex) {
mLogger.log(Level.SEVERE, "DBBC_E00656.IMP_XA_TX_COMMIT_FAILED", new Object[]{"commit", ex});
throw ex;
}
}
} else {
if (mXAEnabled.equalsIgnoreCase("XATransaction")) {
mLogger.log(Level.WARNING, "DBBC_W00654.IMP_XA_TX_NOT_FOUND_IN_MSG_XCHANGE",
new Object[]{msgXChange.getExchangeId()});
}
}
}
/**
* @param jndiName
* @return