ESBCOMP-33 Adding transaction support for POJO-SE

master
David BRASSELY 2013-05-22 13:40:56 +02:00
parent 1c5129aae9
commit f2a91f9c68
5 changed files with 78 additions and 26 deletions

View File

@ -33,6 +33,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import org.glassfish.openesb.pojose.res.impl.CallTracker;
@ -43,9 +44,13 @@ import org.glassfish.openesb.pojose.res.impl.CallTracker;
public class TransactionHelper {
private static String nameResume = "resume" ; //NOI18N
private static String nameSuspend = "suspend" ; //NOI18N
private static String nameSetRollbackOnly = "setRollbackOnly" ; //NOI18N
private static String nameGetTransaction = "getTransaction";
private static Method methodResume = null;
private static Method methodSuspend = null;
private static Method methodSetRollbackOnly = null;
private static Method methodGetTransaction = null;
/**
* <b>Execute through reflection</b>. We do not want to keep comiple and runtime
@ -55,9 +60,10 @@ public class TransactionHelper {
* @param tm
* @param tx
*/
public static void resumeTransaction(Object tm, Object tx){
public static void resumeTransaction(Object tm, MessageExchange me){
Object tx = me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if ((tm != null) && (tx != null)) {
if (methodResume == null){
if (methodResume == null || methodSetRollbackOnly == null){
synchronized(TransactionHelper.class){
if (methodResume == null){
Method[] ms = tm.getClass().getMethods();
@ -70,10 +76,26 @@ public class TransactionHelper {
}
}
}
if (methodSetRollbackOnly == null){
Method[] ms = tm.getClass().getMethods();
Method m = null;
for (int i = 0; i < ms.length; i++){
m = ms[i];
if (nameSetRollbackOnly.equals(m.getName())){
methodSetRollbackOnly = m;
break;
}
}
}
}
}
try {
if(me.getStatus().equals(ExchangeStatus.ERROR)) {
methodSetRollbackOnly.invoke(tx, null);
}
methodResume.invoke(tm, new Object[]{tx});
} catch (IllegalAccessException ex) {
Logger.getLogger(Util.class.getName()).log(Level.SEVERE, null, ex);
@ -85,11 +107,11 @@ public class TransactionHelper {
}
}
public static Object suspendTransaction(Object tm){
public static Object suspendTransaction(Object tm, MessageExchange me){
Object retTxn = null;
if (tm != null) {
if (methodSuspend == null){
if (methodSuspend == null || methodSetRollbackOnly == null || methodGetTransaction == null){
synchronized(TransactionHelper.class){
if (methodSuspend == null){
try {
@ -100,10 +122,40 @@ public class TransactionHelper {
Logger.getLogger(Util.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (methodSetRollbackOnly == null){
Method[] ms = tm.getClass().getMethods();
Method m = null;
for (int i = 0; i < ms.length; i++){
m = ms[i];
if (nameSetRollbackOnly.equals(m.getName())){
methodSetRollbackOnly = m;
break;
}
}
}
if (methodGetTransaction == null){
Method[] ms = tm.getClass().getMethods();
Method m = null;
for (int i = 0; i < ms.length; i++){
m = ms[i];
if (nameGetTransaction.equals(m.getName())){
methodGetTransaction = m;
break;
}
}
}
}
}
try {
Object tx = methodGetTransaction.invoke(tm, null);
if(me.getStatus().equals(ExchangeStatus.ERROR)) {
methodSetRollbackOnly.invoke(tx, null);
}
retTxn = methodSuspend.invoke(tm, null);
} catch (IllegalAccessException ex) {
Logger.getLogger(Util.class.getName()).log(Level.SEVERE, null, ex);
@ -130,13 +182,13 @@ public class TransactionHelper {
public static void checkAndResumeForConsumedME(Object tm, Object txn, MessageExchange consME, CallTracker ct){
if (ct.isOkToResumeTxn(consME)){
resumeTransaction(tm, txn);
resumeTransaction(tm, consME);
}
}
public static void suspendIfStartedForConsumedME(Object tm, MessageExchange me, CallTracker ct){
// No need to check. Suspend will return null if txn is not associated with the thread.
suspendTransaction(tm);
suspendTransaction(tm, me);
//ct.disassociateTxnFromConsumerME(me);
}
}

View File

@ -172,7 +172,7 @@ public class ConsumerImpl implements Consumer{
if (this.pojosME != null){
if ((sendTxn) && (this.pojosME.isTransacted())){
suspendedTxn = TransactionHelper.suspendTransaction(
transactionMngr);
transactionMngr, out);
if (suspendedTxn != null){
out.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME,
this.pojosME.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME));
@ -195,8 +195,8 @@ public class ConsumerImpl implements Consumer{
return suspendedTxn;
}
private void resumeTxn(Object txn){
TransactionHelper.resumeTransaction(transactionMngr, txn);
private void resumeTxn(Object txn, MessageExchange me){
TransactionHelper.resumeTransaction(transactionMngr, me);
}
/**
@ -432,7 +432,7 @@ public class ConsumerImpl implements Consumer{
}
} finally {
if (suspTxn != null){
resumeTxn(suspTxn);
resumeTxn(suspTxn, me);
}
}
@ -508,7 +508,7 @@ public class ConsumerImpl implements Consumer{
}
} finally {
if (suspTxn != null){
resumeTxn(suspTxn);
resumeTxn(suspTxn, me);
}
}
@ -640,7 +640,7 @@ public class ConsumerImpl implements Consumer{
}
}finally {
if (suspTxn != null){
resumeTxn(suspTxn);
resumeTxn(suspTxn, io);
}
}
}
@ -725,7 +725,7 @@ public class ConsumerImpl implements Consumer{
}
}finally {
if (suspTxn != null){
resumeTxn(suspTxn);
resumeTxn(suspTxn, io);
}
}
}
@ -876,7 +876,7 @@ public class ConsumerImpl implements Consumer{
}
} finally {
if (suspTxn != null){
resumeTxn(suspTxn);
resumeTxn(suspTxn, io);
}
}
@ -1029,7 +1029,7 @@ public class ConsumerImpl implements Consumer{
}
} finally {
if (suspTxn != null){
resumeTxn(suspTxn);
resumeTxn(suspTxn, io);
}
}

View File

@ -69,7 +69,7 @@ public class InOnlyExecutor extends BasePojoExecutor {
// Ctx Txn. Set the Transaction context.
if (provME.isTransacted()){
TransactionHelper.resumeTransaction(ctx.getJBIComponentContext().getTransactionManager(),
provME.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME));
provME);
}
Object[] args = getInputArguments(provME, m, opm);
@ -98,7 +98,7 @@ public class InOnlyExecutor extends BasePojoExecutor {
// Ctx Txn
if (provME.isTransacted()){
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager());
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager(), provME);
}
pt.setExecutedOperation();
}
@ -144,7 +144,7 @@ public class InOnlyExecutor extends BasePojoExecutor {
// Ctx Txn. Set the Transaction context.
if (provME.isTransacted()){
TransactionHelper.resumeTransaction(ctx.getJBIComponentContext().getTransactionManager(),
provME.getProperty(provME.JTA_TRANSACTION_PROPERTY_NAME));
provME);
}
onDone.invoke(pojo, null);
@ -170,7 +170,7 @@ public class InOnlyExecutor extends BasePojoExecutor {
// Ctx Txn
if (provME.isTransacted()){
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager());
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager(), provME);
}
}

View File

@ -69,7 +69,7 @@ public class InOutExecutor extends BasePojoExecutor {
// Ctx Txn. Set the Transaction context.
if (provME.isTransacted()){
TransactionHelper.resumeTransaction(ctx.getJBIComponentContext().getTransactionManager(),
provME.getProperty(provME.JTA_TRANSACTION_PROPERTY_NAME));
provME);
}
Object[] args = getInputArguments(provME, m, opm);
@ -106,7 +106,7 @@ public class InOutExecutor extends BasePojoExecutor {
// Ctx Txn
if (provME.isTransacted()){
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager());
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager(), provME);
}
pt.setExecutedOperation();
}
@ -159,7 +159,7 @@ public class InOutExecutor extends BasePojoExecutor {
// Ctx Txn. Set the Transaction context.
if (provME.isTransacted()){
TransactionHelper.resumeTransaction(ctx.getJBIComponentContext().getTransactionManager(),
provME.getProperty(provME.JTA_TRANSACTION_PROPERTY_NAME));
provME);
}
Object retObj = onDone.invoke(pojo, null);
@ -192,7 +192,7 @@ public class InOutExecutor extends BasePojoExecutor {
// Ctx Txn
if (provME.isTransacted()){
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager());
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager(), provME);
}
}

View File

@ -208,7 +208,7 @@ public class InMsgTask extends BaseTask {
// Ctx Txn
if (me.isTransacted()){
TransactionHelper.resumeTransaction(ctx.getJBIComponentContext().getTransactionManager(),
me.getProperty(me.JTA_TRANSACTION_PROPERTY_NAME));
me);
}
pojoObj = pc.newInstance();
@ -227,7 +227,7 @@ public class InMsgTask extends BaseTask {
// Ctx Txn
if (me.isTransacted()){
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager());
TransactionHelper.suspendTransaction(ctx.getJBIComponentContext().getTransactionManager(), me);
}
}