Merged in bbendukow/openesb-components/ESBCOMP-88-the-link-between-nmr-and-bpel (pull request #23)

ESBCOMP-88 fix. Thanks to Alexey Bychkov.
master
David Brassely 2014-02-06 13:20:01 +01:00
commit 5a0c09b16a
9 changed files with 63 additions and 16 deletions

View File

@ -49,7 +49,7 @@ public interface CRMPDBO extends DBObject {
* bpelmessageexchange may need to be addressed in the future..
*/
String BASE_INSERT_STMT_STR = "INSERT INTO " + //$NON-NLS-1$
PersistenceDBSchemaCreation.CRMP + " (stateid, crmpinvokeid, partnerlink, operation) VALUES(?, ?, ?, ? )"; //$NON-NLS-1$
PersistenceDBSchemaCreation.CRMP + " (stateid, crmpinvokeid, partnerlink, operation, bpelmessageexchange) VALUES(?, ?, ?, ?, ?)"; //$NON-NLS-1$
/** update statement */
String BASE_UPDATE_STMT_STR = "UPDATE " + //$NON-NLS-1$
@ -61,11 +61,11 @@ public interface CRMPDBO extends DBObject {
PersistenceDBSchemaCreation.CRMP + "WHERE crmpinvokeid = ?"; //$NON-NLS-1$
/** query statement */
String BASE_QUERY_STMT_STR = "SELECT stateid, partnerlink, operation, replyvariableid FROM " + //$NON-NLS-1$
String BASE_QUERY_STMT_STR = "SELECT stateid, partnerlink, operation, replyvariableid, bpelmessageexchange FROM " + //$NON-NLS-1$
PersistenceDBSchemaCreation.CRMP + "WHERE crmpinvokeid = ?"; //$NON-NLS-1$
/** query statement */
String QUERY_STMT_FOR_CLUSTERED_INVOKE_STR = "SELECT stateid, partnerlink, operation, replyvariableid FROM " + //$NON-NLS-1$
String QUERY_STMT_FOR_CLUSTERED_INVOKE_STR = "SELECT stateid, partnerlink, operation, replyvariableid, bpelmessageexchange FROM " + //$NON-NLS-1$
PersistenceDBSchemaCreation.CRMP + " WHERE crmpinvokeid = ? and replyvariableid <> -1"; //$NON-NLS-1$
/* sybase, db2, sqlserver, pointbase statements to be added */

View File

@ -59,4 +59,5 @@ public interface QueryCRMPDBO extends DBObject{
long getReplyVariableId();
String getCRMPInvokeId();
}

View File

@ -109,6 +109,7 @@ public class CRMPDBOImpl extends DBObjectImpl implements CRMPDBO {
stmt.setString(2, getCRMPInvokeId());
stmt.setString(3, getPartnerLink());
stmt.setString(4, getOperation());
stmt.setString(5, getBpelMessageExchange());
}
@Override
@ -141,7 +142,7 @@ public class CRMPDBOImpl extends DBObjectImpl implements CRMPDBO {
mPartnerLink = rs.getString(2);
mOperation = rs.getString(3);
mReplyVarId = rs.getLong(4);
mBpelMsgExchange = rs.getString(5);
}
@ -181,7 +182,7 @@ public class CRMPDBOImpl extends DBObjectImpl implements CRMPDBO {
}
return retVal;
}
public String toString() {
StringBuilder retStr = new StringBuilder();
retStr.append(super.toString());
@ -194,6 +195,8 @@ public class CRMPDBOImpl extends DBObjectImpl implements CRMPDBO {
retStr.append("\n\t");
retStr.append("Partner Link = " + getPartnerLink());
retStr.append("\n\t");
retStr.append("BpelMessageExcange = "+getBpelMessageExchange());
retStr.append("\n\t");
retStr.append("Reply Variable ID = " + getReplyVariableId());
return retStr.toString();
}

View File

@ -103,4 +103,7 @@ public class QueryCRMPDBOImpl extends DBObjectImpl implements QueryCRMPDBO {
return mReplyVarId;
}
public String getCRMPInvokeId() {
return mCRMPInvId;
}
}

View File

@ -43,7 +43,7 @@ import com.sun.wsdl4j.ext.bpel.MessagePropertyAlias;
* As per the BPEL specifcation, the correlation value would be
* {valueOfProp1}{valueOfProp2}....{valueOfLastProp}. To facilitate, support
* of correlations that are totally unrelated to each other but end up
* having the same ID, a prefix of valueOfCorrelationSetUniqueID is added.
* having the same ID, a prefix of "valueOfCorrelationSetUniqueID" is added.
* ValueOfCorrelationSetUniqueID is preferred to the
* valueOfCorrelationSetName because of the possibility of overloaded names
* in the same BPEL within scopes. When the model is parsed, each
@ -52,7 +52,7 @@ import com.sun.wsdl4j.ext.bpel.MessagePropertyAlias;
*
* Extending the same reasoning further, to differentiate CorrelationIDs
* across different BPELs within the same engine, the ID is suffixed with
* the uniqueIDof BPEL. This plays an important role in the design for
* the "uniqueIDof BPEL". This plays an important role in the design for
* recovery. Since the engine uses one persistenceStore, all the
* correlationIDs across BPEL instances are stored in the same table.
*

View File

@ -239,7 +239,7 @@ public class ActivityUnitFactory {
* part defined by an element and there exists a catch activity with a matching faultName
* value that has a faultVariable whose type matches the type of the element used to define
* the part then the fault is passed to the identified catch activity with the faultVariable
* initialized to the value in the single parts element. 3. Otherwise if there is a catch
* initialized to the value in the single part's element. 3. Otherwise if there is a catch
* activity with a matching faultName value that does not specify a faultVariable or
* faultMessageType value then the fault is passed to the identified catch activity. Note
* that in this case the fault value will not be available from within the fault handler but
@ -250,7 +250,7 @@ public class ActivityUnitFactory {
* element and there exists a catch activity without a faultName attribute that has a
* faultVariable whose type matches the type of the element used to define the part then the
* fault is passed to the identified catch activity with the faultVariable initialized to
* the value in the single parts element. 6. Otherwise if there is a catchAll handler then
* the value in the single part's element. 6. Otherwise if there is a catchAll handler then
* the fault is passed to the catchAll handler. 7. Otherwise, the fault will be handled by
* the default fault handler. If the fault occurs in (or is rethrown to) the global process
* scope, and there is no matching fault handler for the fault at the global level, the
@ -333,7 +333,7 @@ public class ActivityUnitFactory {
* and there exists a catch activity with a matching faultName value that has a faultVariable
* whose type matches the type of the element used to define the part then the fault is passed
* to the identified catch activity with the faultVariable initialized to the value in the
* single parts element. 3. Otherwise if there is a catch activity with a matching faultName
* single part's element. 3. Otherwise if there is a catch activity with a matching faultName
* value that does not specify a faultVariable or faultMessageType value then the fault is
* passed to the identified catch activity. Note that in this case the fault value will not be
* available from within the fault handler but will be available to the "rethrow" activity. 4.
@ -343,7 +343,7 @@ public class ActivityUnitFactory {
* contains a single part defined by an element and there exists a catch activity without a
* faultName attribute that has a faultVariable whose type matches the type of the element used
* to define the part then the fault is passed to the identified catch activity with the
* faultVariable initialized to the value in the single parts element.
* faultVariable initialized to the value in the single part's element.
*
* @param catches Collection of catch activities
* @param faultName The faultName to match

View File

@ -192,12 +192,36 @@ public class PersistenceManager {
specificUpdateStateForStartElement(rObjs, (RStartElement) activity, txInfo, state);
}
public void updateState(ReceiveUnitImpl unit, RequiredObjects rObjs, TransactionInfo txInfo, MutableState state, long branchInvokeCounter, String messageExchangeId) {
if (!mBPInstance.getBPELProcessManager().isPersistenceEnabled()) {
return;
}
RActivity activity = unit.getStaticModelActivity();
state.updatePCWithBranchInvokeCounter(unit.getBranchId(), activity.getUniqueId(), branchInvokeCounter);
String crmpInvId = (String) rObjs.removeValue(RequiredObjects.CRMP_INVOKE_ID);
if (crmpInvId != null) {
RStartElement startElement = (RStartElement)activity;
String partnerLink = startElement.getRPartner().getName();
String operation = ((OperationReference) startElement).getOperation();
state.updateCRMPState(crmpInvId, partnerLink, operation, messageExchangeId);
}
StateManager mgr = mEng.getStateManager();
mgr.persistState((State) state, txInfo, mBPInstance);
}
private void specificUpdateStateForStartElement(RequiredObjects rObjs, RStartElement startElement,
TransactionInfo txInfo, MutableState state) {
String crmpInvId = (String) rObjs.removeValue(RequiredObjects.CRMP_INVOKE_ID);
if (crmpInvId != null) {
String partnerLink = startElement.getRPartner().getName();
String operation = ((OperationReference) startElement).getOperation();
//TODO: (BB) It seems that next line contains error, because startElement is a class of bpelmodel project,
//which contains setMessageExchange method but doesn't provide it through any interfaces and never uses (Make "find usages" setMessageExchange of com.sun.bpel.model.impl.* classes).
//so, the messageExchange will be null in any case. Please see where is a bug.
String bpelMesgExchange = startElement.getMessageExchange();
state.updateCRMPState(crmpInvId, partnerLink, operation, bpelMesgExchange);

View File

@ -139,8 +139,9 @@ public class ReceiveUnitImpl extends ActivityUnitImpl {
//TODO: This time we assume that the message will be found. Is that a valid assumption?
Object[] result = procMgr.receiveRequestOrPutInPendingQueue(mWaitingForEvent, frame);
MessageContainer request = (MessageContainer) result[0];
if(request != null)
if(request != null) {
mContext.addRequest(receive, request);
}
//Post event
postVarEvent (receive.getRVariable(), request, rObjs, bpit.getCallFrame());
@ -366,7 +367,7 @@ public class ReceiveUnitImpl extends ActivityUnitImpl {
// persist the receive, and then calculate the correlation values.
frame.getProcessInstance().getPersistenctMgr().updateState(this, rObjs,
TransactionInfo.getLocalTxInfo(), mContext.getStateContext().getState(),
frame.getBranchInvokeCounter());
frame.getBranchInvokeCounter(), request.getId());
// process correlations
if (receive.getStartType() != Engine.RECEIVE_TYPE_NO_COR_JOIN_ONLY) {
@ -441,7 +442,7 @@ public class ReceiveUnitImpl extends ActivityUnitImpl {
// persist the receive activity (PC) as part of the TX
frame.getProcessInstance().getPersistenctMgr().updateState(this, rObjs, txInfo,
mContext.getStateContext().getState(), frame.getBranchInvokeCounter());
mContext.getStateContext().getState(), frame.getBranchInvokeCounter(), request.getId());
/*
* need to set the activity to ExecutionState.WaitingForTxComplete.

View File

@ -34,7 +34,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
@ -54,6 +53,7 @@ import javax.xml.namespace.QName;
import com.sun.bpel.model.Activity;
import com.sun.bpel.model.Pick;
import com.sun.bpel.model.meta.RActivityHolder;
import com.sun.bpel.model.meta.RBPELProcess;
import com.sun.bpel.model.meta.RMessagingElement;
import com.sun.bpel.model.meta.RPartnerLink;
import com.sun.bpel.model.meta.RStartElement;
@ -81,12 +81,13 @@ import com.sun.jbi.engine.bpel.core.bpel.engine.BPELSERegistry;
import com.sun.jbi.engine.bpel.core.bpel.engine.CorrelationManager;
import com.sun.jbi.engine.bpel.core.bpel.engine.Engine;
import com.sun.jbi.engine.bpel.core.bpel.engine.ICallFrame;
import com.sun.jbi.engine.bpel.core.bpel.engine.MessageContainer;
import com.sun.jbi.engine.bpel.core.bpel.engine.MessageContainerFactory;
import com.sun.jbi.engine.bpel.core.bpel.engine.RecoveredCallFrame;
import com.sun.jbi.engine.bpel.core.bpel.engine.impl.CorrelatingSAInComingEventKeyImpl;
import com.sun.jbi.engine.bpel.core.bpel.engine.impl.CorrelationDefnValues;
import com.sun.jbi.engine.bpel.core.bpel.engine.impl.CorrelationVal;
import com.sun.jbi.engine.bpel.core.bpel.exception.BPELRuntimeException;
import com.sun.jbi.engine.bpel.core.bpel.exception.StandardException;
import com.sun.jbi.engine.bpel.core.bpel.model.runtime.ActivityUnit;
import com.sun.jbi.engine.bpel.core.bpel.model.runtime.Fault;
import com.sun.jbi.engine.bpel.core.bpel.model.runtime.FaultHandlingContext;
@ -393,8 +394,22 @@ public class StateManagerImpl implements StateManager {
String partnerlink = qCRMPDBO.getPartnerLink();
String oper = qCRMPDBO.getOperation();
String crmpUpdateListValue = bpId + partnerlink + oper;
String crmpInvokeId = qCRMPDBO.getCRMPInvokeId();
long replyVarId = qCRMPDBO.getReplyVariableId();
String msgExch = qCRMPDBO.getBpelMessageExchange();
BPELProcessInstanceImpl instImpl = (BPELProcessInstanceImpl) processInstance;
instImpl.addToCRMPUpdateList(crmpUpdateListValue);
if (replyVarId == -1) {
RBPELProcess proc = instImpl.getBPELProcessManager().getBPELProcess();
RStartElement actStart = proc.getStartElement(partnerlink, oper, msgExch);
RVariable rVar = actStart.getRVariable();
RuntimeVariable runVar = instImpl.getRuntimeVariable(rVar);
MessageContainer con = MessageContainerFactory.createMessage(msgExch, runVar.getWSMessage(), crmpInvokeId, null);
instImpl.getBPELProcessManager().addCRMPReqForRecoveringInsts(crmpUpdateListValue, con);
}
}
}
} catch (Throwable t) {