HL7 BC - Fixed the issue for resetRecourseAction. If external system is not responding then the appropriate recourse action should be executed, fixed the issue if msg contains msa segment. JMS BC - Configurations added DefaultRedeliveryHandling, ForceConcurrencyMode, ForceMaxConcurrentConsumers, Fixed - denormalizer issue when using encoderlib - using encodeBytes instead of encodeString. Rest BC: Upgraded JSON libraries

master
Vishnu 2013-02-08 19:13:47 +05:30
parent e706ec015e
commit e633bb1dc8
24 changed files with 488 additions and 44 deletions

View File

@ -81,6 +81,7 @@ import com.sun.jbi.common.qos.messaging.MessagingChannel;
import com.sun.jbi.common.qos.messaging.SendFailureListener;
import com.sun.jbi.hl7bc.extensions.HL7CommunicationControl;
import com.sun.jbi.hl7bc.extensions.HL7Input;
import com.sun.jbi.hl7bc.extensions.HL7Address;
import com.sun.jbi.hl7bc.extensions.HL7Operation;
import com.sun.jbi.hl7bc.extensions.HL7Output;
import com.sun.jbi.hl7bc.extensions.HL7Message;
@ -406,6 +407,13 @@ public class InboundMessageProcessor implements Callable<String>, MessageExchang
private void processInboundAck() throws Exception {
String use = mHL7Operation.getHL7OperationInput().getHL7Message().getUseType();
Source src;
// here if ack message received in case of original mode
// should not process the message. and send the nack back to the sender.
if(mAckMode.equals(ACK_MODE_ORIGINAL)){
String nakMsg = generateCannedNakMessage("MessageType MSH.9 is ACK, hence rejecting the message processing");
mHL7Callback.onReply(nakMsg, false);
return;
}
if (use.equals(HL7Message.ATTR_USE_TYPE_ENCODED)) {
src = mEncoder.decodeFromString(mHL7Msg);
} else {
@ -492,6 +500,13 @@ public class InboundMessageProcessor implements Callable<String>, MessageExchang
Probe normalizationMeasurement = Probe.info(getClass(), endPointID,
HL7BindingComponent.PERF_CAT_NORMALIZATION);
try {
HL7Address adrs = mEndpoint.getHL7Address();
mLog.log(Level.INFO, I18n.msg(
"I9124: Received HL7 message from the client address {0} : on to the server port {1}. ",
mHL7Callback.getClientInfo(), Integer.toString(adrs.getHL7ServerPort())));
mLog.log(Level.INFO, I18n.msg(
"Msg: {0} ",
mHL7Msg.toString()));
inMsg = mHL7Normalizer.normalize(mOperationName, mEndpoint, hl7Message, mHL7Msg);
} catch (Exception exe) {
// Freeze further processing since message validation against xml schema failed
@ -624,6 +639,7 @@ public class InboundMessageProcessor implements Callable<String>, MessageExchang
HL7MMUtil.getSolutionGroup(mEndpoint)});
}
// </checkpoint>
mEndpoint.releaseThrottle();
}
}
@ -748,10 +764,11 @@ public class InboundMessageProcessor implements Callable<String>, MessageExchang
private int retrieveSequenceNumber(String queryKey) throws SQLException, Exception {
int seqNO = -1;
DBConnection dbConnection = null;
ResultSet rs = null;
try {
dbConnection = getDBConnection();
SequenceNumDBO seqNoDBO = mDBObjectFactory.createSequenceNumDBO(queryKey);
ResultSet rs = dbConnection.getRow(seqNoDBO);
rs = dbConnection.getRow(seqNoDBO);
if (rs.next()) {
seqNoDBO.populateDBO(rs);
} else {
@ -762,6 +779,9 @@ public class InboundMessageProcessor implements Callable<String>, MessageExchang
}
seqNO = seqNoDBO.getESN();
} finally {
if(rs != null){
rs.close();
}
if (dbConnection != null) {
dbConnection.close();
}

View File

@ -527,7 +527,7 @@ public class OutboundMessageProcessor implements HL7Constants {
}
HL7ProtocolProperties hl7ProtocolProps = getHL7ProtocolProperties(normalizedMsg, endpoint);
// In case of acknowledgments, message validation should not be done
boolean msaFound = hasSegmentMSA(((DOMSource) src).getNode());
boolean msaFound = messageIsAck(((DOMSource) src).getNode());
boolean seqNoEnabled = hl7ProtocolProps.getSeqNumEnabled().booleanValue();
String ackMode = hl7ProtocolProps.getAckMode();
boolean journallingEnabled = hl7ProtocolProps.getJournallingEnabled().booleanValue();
@ -803,7 +803,7 @@ public class OutboundMessageProcessor implements HL7Constants {
}
HL7ProtocolProperties hl7ProtocolProps = getHL7ProtocolProperties(normalizedMsg, destination);
// In case of acknowledgments, message validation should not be done
boolean msaFound = hasSegmentMSA(((DOMSource) src).getNode());
boolean msaFound = messageIsAck(((DOMSource) src).getNode());
boolean seqNoEnabled = hl7ProtocolProps.getSeqNumEnabled().booleanValue();
String ackMode = hl7ProtocolProps.getAckMode();
boolean journallingEnabled = hl7ProtocolProps.getJournallingEnabled().booleanValue();
@ -1330,6 +1330,33 @@ public class OutboundMessageProcessor implements HL7Constants {
return msaFound;
}
private boolean messageIsAck(Node domMessage) {
boolean ackMsg = false;
Node mshNode = null;
NodeList segments = domMessage.getFirstChild().getFirstChild().getFirstChild().getChildNodes();
for (int i = 0; i < segments.getLength(); i++) {
Node segment = segments.item(i);
String name = segment.getLocalName();
if (name != null && name.contains("MSH")) {
mshNode = segment;
break;
}
}
if(mshNode != null){
NodeList list = mshNode.getChildNodes();
for(int j=0; j < list.getLength(); j++){
Node node = list.item(j);
String localName = node.getLocalName();
if(localName != null && localName.contains("MSH.9")){
String nodeValue = node.getFirstChild().getFirstChild().getNodeValue();
ackMsg = nodeValue.startsWith("ACK");
break;
}
}
}
return ackMsg;
}
private Document getDocument(Source src) throws Exception {
DOMResult result = transformToDOMResult(mTrans, src);
Node node = result.getNode();
@ -1363,6 +1390,14 @@ public class OutboundMessageProcessor implements HL7Constants {
dbConnection.getUnderlyingConnection().commit();
}
seqNO = seqNoDBO.getESN();
if(rs != null){
try{
rs.close();
} catch (SQLException e) {
mLog.log(Level.SEVERE,
I18n.msg("E9335: Exception occurred while closing the result set"));
}
}
return seqNO;
}
@ -1635,6 +1670,8 @@ public class OutboundMessageProcessor implements HL7Constants {
}
}
}
// before return hl7connector , set the protocolInfo
hl7Connector.setProtocolInfo(protocolInfo);
return hl7Connector;
}
@ -1913,7 +1950,19 @@ public class OutboundMessageProcessor implements HL7Constants {
HL7MMUtil.setCheckpoint(mChkptMsgId, "Connection-Established-To-ExternalSystem", endpoint );
}
// </checkpoint>
String hostName = protocolInfo.get(HL7Address.ATTR_HL7_SVR_LOCATION);
int port = Integer.parseInt(protocolInfo.get(HL7Address.ATTR_HL7_SVR_PORT));
mLog.log(Level.INFO, I18n.msg("I9789: HL7 Message sent to the external system, Address, Host:{0} , Port: {1}",
hostName, Integer.toString(port)));
mLog.log(Level.INFO, I18n.msg("Message : {0} ",
hl7PayLoad));
String ackMsg = sendAndReceiveHL7Message(inOnly, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtocolProps);
if(ackMsg != null){
mLog.log(Level.INFO, I18n.msg("I9790: ACK/NAK received from the external system, Address, Host:{0} , Port: {1}",
hostName, Integer.toString(port)));
mLog.log(Level.INFO, I18n.msg("ACK/NAK Message : {0} ",
ackMsg));
}
// <checkpoint>
if ( mMonitorEnabled ) {
HL7MMUtil.setCheckpoint(mChkptMsgId, "Message-Sent-And-ACK-Received-From-ExternalSystem", endpoint );
@ -2017,7 +2066,7 @@ public class OutboundMessageProcessor implements HL7Constants {
hl7Connector.connect(protocolInfo, endpoint);
} else if (!mRuntimeConfig.isAlwaysCreatesNewConnEnabled()){
boolean isPooledConnector = true;
if((hl7Connector != null) && (mHL7ConnectorTakenFromPool) && (hl7Connector.getHL7Connection().isConnected())){
if((hl7Connector != null) && (mHL7ConnectorTakenFromPool) && (hl7Connector.getHL7Connection() != null)&&(hl7Connector.getHL7Connection().isConnected())){
hl7Connector.setIoSession(hl7Connector.getHL7Connection());
}else{
hl7Connector.connect(protocolInfo, endpoint);
@ -2089,20 +2138,32 @@ public class OutboundMessageProcessor implements HL7Constants {
if (ackMsg == null) {
commCntrl = mHL7CommunicationControlsInfo.getMaxNoResponseCommControl();
// Max No Response communication control
boolean maxCountReached = false;
if (commCntrl != null && commCntrl.getEnabled().booleanValue()) {
handleMaxNoResponse(hl7Connector, endpoint);
maxCountReached = handleMaxNoResponse(hl7Connector, endpoint);
if(maxCountReached){
if (ACTION_SUSPEND.equalsIgnoreCase(commCntrl.getRecourseAction())) {
suspendRecourseAction("Suspend on max no response; max no response is " + commCntrl.getValue(),
hl7Connector, endpoint);
} else if (ACTION_RESET.equalsIgnoreCase(commCntrl.getRecourseAction())) {
return resetRecourseAction("Reset on max no response; max no response is " + commCntrl.getValue(),
msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
}
}
}
if (ACTION_RESEND.equalsIgnoreCase(mHL7CommunicationControl.getRecourseAction())) {
mLog.log(Level.INFO, I18n.msg("Resend on no response; no response count {0} ",mCountNoResponse));
return resendRecourseAction("Resend on no response; no response count " + mCountNoResponse,
msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
} else if (ACTION_SUSPEND.equalsIgnoreCase(mHL7CommunicationControl.getRecourseAction())) {
suspendRecourseAction("Suspend on no response", hl7Connector, endpoint);
} else if (defaultMode || ACTION_RESET.equalsIgnoreCase(mHL7CommunicationControl.getRecourseAction())) {
resetRecourseAction("Reset on no response", hl7Connector);
return resetRecourseAction("Reset on no response", msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
}
}
} else {
ackMsg = hl7Connector.recvHL7Message();
mCountNoResponse = 0;
endpoint.setLastACKMsgReceivedTimeStamp(System.currentTimeMillis());
}
@ -2167,21 +2228,35 @@ public class OutboundMessageProcessor implements HL7Constants {
if (mLog.isLoggable(Level.FINE)) {
mLog.log(Level.FINE, I18n.msg("Nak Message Received"), ackMsg);
}
boolean retry = true;
boolean retry = false;
boolean archived = false;
commCntrl = mHL7CommunicationControlsInfo.getMaxNakReceivedCommControl();
// handle max nak recieved first
if (commCntrl != null && commCntrl.getEnabled().booleanValue()) {
retry = handleMaxNakReceived(hl7Connector, endpoint, hl7PayLoad, ackMsg, journallingEnabled, messageControlID);
if(retry){
if (ACTION_SUSPEND.equalsIgnoreCase(commCntrl.getRecourseAction())) {
suspendRecourseAction("Suspend on max NAK received; max NAK received is " + commCntrl.getValue(),
hl7Connector, endpoint);
} else if (ACTION_RESET.equalsIgnoreCase(commCntrl.getRecourseAction())) {
return resetRecourseAction("Reset on max NAK received; max NAK received is " + commCntrl.getValue(),
msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
}else if(SKIPMESSAGE.equalsIgnoreCase(commCntrl.getRecourseAction())){
// ToDo: SKIP Message handling
skipMessageRecourseAction("Skip message on max NAK received; max NAK received is " +commCntrl.getValue(),
hl7Connector, endpoint, hl7PayLoad, ackMsg, journallingEnabled, messageControlID);
retry = true;
}// ToDo: SKIP Message handling
}
}
HL7CommunicationControl nakRecvCommCntrl = mHL7CommunicationControlsInfo.getNakReceivedCommControl();
if (retry && nakRecvCommCntrl != null && nakRecvCommCntrl.getEnabled().booleanValue()) {
if (!retry && nakRecvCommCntrl != null && nakRecvCommCntrl.getEnabled().booleanValue()) {
// take resend recourse action
if (ACTION_RESEND.equalsIgnoreCase(nakRecvCommCntrl.getRecourseAction())) {
return resendRecourseAction("Resend on NAK received; NAK receive count "
+ mCountNakReceived, msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
} else if (ACTION_RESET.equalsIgnoreCase(nakRecvCommCntrl.getRecourseAction())) {
resetRecourseAction("Reset on NAK received; NAK receive", hl7Connector);
return resetRecourseAction("Reset on NAK received; NAK receive",msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
}else if(SKIPMESSAGE.equalsIgnoreCase(nakRecvCommCntrl.getRecourseAction())){
// ToDo: SKIP Message handling
skipMessageRecourseAction("Skip message on NAK received", hl7Connector, endpoint, hl7PayLoad, ackMsg,
@ -2223,14 +2298,15 @@ public class OutboundMessageProcessor implements HL7Constants {
if (mLog.isLoggable(Level.FINE)) {
mLog.log(Level.FINE, I18n.msg("Entered handleMaxNakReceived() method"));
}
boolean retry = true;
boolean maxNakCountReached = false;
mCountNakReceived += 1;
// handle max nak recourse action
HL7CommunicationControl commCntrl = mHL7CommunicationControlsInfo.getMaxNakReceivedCommControl();
if (mCountNakReceived > commCntrl.getValue()) {
// max nak received will be handled, so reset count to 0
mCountNakReceived = 0;
if (ACTION_SUSPEND.equalsIgnoreCase(commCntrl.getRecourseAction())) {
maxNakCountReached = true;
/*if (ACTION_SUSPEND.equalsIgnoreCase(commCntrl.getRecourseAction())) {
suspendRecourseAction("Suspend on max NAK received; max NAK received is " + commCntrl.getValue(),
hl7Connector, endpoint);
} else if (ACTION_RESET.equalsIgnoreCase(commCntrl.getRecourseAction())) {
@ -2241,29 +2317,32 @@ public class OutboundMessageProcessor implements HL7Constants {
skipMessageRecourseAction("Skip message on max NAK received; max NAK received is " +commCntrl.getValue(),
hl7Connector, endpoint, hl7message, nakmessage, journallingEnabled, messageControlID);
retry = false;
}// ToDo: SKIP Message handling
}*/// ToDo: SKIP Message handling
}
return retry;
return maxNakCountReached;
}
private void handleMaxNoResponse(HL7Connector hl7Connector, Endpoint endpoint) throws Exception {
private boolean handleMaxNoResponse(HL7Connector hl7Connector, Endpoint endpoint) throws Exception {
if (mLog.isLoggable(Level.FINE)) {
mLog.log(Level.FINE, I18n.msg("Entered handleMaxNoResponse() method"));
}
boolean maxCountReached = false;
mCountNoResponse += 1;
// handle max no response recourse action
HL7CommunicationControl commCntrl = mHL7CommunicationControlsInfo.getMaxNoResponseCommControl();
if (mCountNoResponse > commCntrl.getValue()) {
// max no response will be handled, reset count to 0
mCountNoResponse = 0;
if (ACTION_SUSPEND.equalsIgnoreCase(commCntrl.getRecourseAction())) {
maxCountReached = true;
/* if (ACTION_SUSPEND.equalsIgnoreCase(commCntrl.getRecourseAction())) {
suspendRecourseAction("Suspend on max no response; max no response is " + commCntrl.getValue(),
hl7Connector, endpoint);
} else if (ACTION_RESET.equalsIgnoreCase(commCntrl.getRecourseAction())) {
resetRecourseAction("Reset on max no response; max no response is " + commCntrl.getValue(),
hl7Connector);
}
}*/
}
return maxCountReached;
}
private void skipMessageRecourseAction(String resoneCode, HL7Connector hl7Connector, Endpoint endpoint, String hl7message,
@ -2543,16 +2622,24 @@ public class OutboundMessageProcessor implements HL7Constants {
}
}
private void resetRecourseAction(String resetReason, HL7Connector hl7Connector) throws Exception {
private String resetRecourseAction(String resetReason,
MessageExchange msgExchange,
String hl7PayLoad,
HL7Connector hl7Connector,
Endpoint endpoint,
HL7Message hl7message,
HL7ProtocolProperties hl7ProtoclProperties) throws Exception {
try {
if (mLog.isLoggable(Level.FINE)) {
mLog.log(Level.FINE, resetReason);
}
// close the existing connection
// close the existing connection and recreate
if (hl7Connector != null) {
hl7Connector.discardConnection();
}
throw new Exception(resetReason);
//throw new Exception(resetReason);
establishConnectionToExtSys(hl7Connector, hl7Connector.getProtocolInfo(), endpoint);
return sendAndReceiveHL7Message(msgExchange, hl7PayLoad, hl7Connector, endpoint, hl7message, hl7ProtoclProperties);
} catch (Exception ex) {
mLog.log(Level.SEVERE, resetReason);
throw new Exception(resetReason);

View File

@ -170,6 +170,12 @@ public final class HL7BCConnectionManager {
if ( mLogger.isLoggable(Level.FINE))
mLogger.log(Level.FINE, "Connection with key=[" + key + "] obtained ... connection =" + conn);
conn.stopTimer(); // max idle timer stop ticking
// and check whether the connection is active or not.
// if not active dischard the conn and return null.
if(!conn.getIOSessionObject().isConnected()){
conn.discard();
conn = null;
}
}
@ -240,6 +246,31 @@ public final class HL7BCConnectionManager {
mWL.unlock();
}
}
public static final void closeConnectionIfNotAvailable(ConnectionInfo connInfo) throws Exception {
mWL.lock();
try {
if(connInfo == null)
return;
if ( mConnectionPools.size() > 0 ) {
Set keys = mConnectionPools.keySet();
Iterator<ConnectionInfo> it = keys.iterator();
while ( it.hasNext() ) {
ConnectionInfo key = it.next();
if((connInfo.getHost().equals(key.getHost()) &&
connInfo.getPort() == key.getPort() && connInfo.getEndpointName().equals(key.getEndpointName()))){
ConnectionPool pool = mConnectionPools.get(key);
System.out.println("Found the pool "+pool);
if ( pool != null ) {
pool.cleanup();
}
}
}
}
} finally {
mWL.unlock();
}
}
/**
* free up the connection pools called by ComponentLifeCycle.stop()

View File

@ -67,5 +67,9 @@ public interface HL7Connector {
void setHL7Connection(Connection conn) throws Exception;
// set back the assoaciated IOSession on the connector
void setIoSession(Connection conn) throws Exception;
// get the protocol properties associated with the connector
ProtocolInfo getProtocolInfo() throws Exception;
// get the ProtocalInfo to the connector
void setProtocolInfo(ProtocolInfo pInfo) throws Exception;
}// end of the interface

View File

@ -84,6 +84,7 @@ public class TCPIpHL7Connector extends IoHandlerAdapter implements HL7Connector
final Condition ackNotRecvd = lock.newCondition();
private Connection mHl7Connection = null;
private ProtocolInfo mProtocolInfo = null;
boolean useConnectionPool = false;;
public TCPIpHL7Connector() {
}
@ -151,6 +152,13 @@ public class TCPIpHL7Connector extends IoHandlerAdapter implements HL7Connector
public void setHL7Connection(Connection conn) throws Exception {
this.mHl7Connection = conn;
}
public ProtocolInfo getProtocolInfo() throws Exception {
return this.mProtocolInfo;
}
public void setProtocolInfo(ProtocolInfo pInfo) throws Exception {
this.mProtocolInfo = pInfo;
}
public void setIoSession(Connection conn) throws Exception{
this.mHl7Connection = conn;
@ -161,22 +169,26 @@ public class TCPIpHL7Connector extends IoHandlerAdapter implements HL7Connector
mSession = (IoSession)mHl7Connection.getIOSessionObject();
}else{
// if the connection taken from the pool is closed, creating new connection
ConnectionHelper connHelper = new ConnectionHelper();
/* ConnectionHelper connHelper = new ConnectionHelper();
future = connHelper.getConnection(connInfo.getHost(),connInfo.getPort(),mHl7Connection.getIoHandler(),connInfo.getIoServiceConfig(),connInfo.getRetryLogicString());
future.join(); // Wait until the connection attempt is finished.
mSession = future.getSession();
mSession.setAttribute("currentInput", new ByteArrayOutputStream());
mSession.setAttribute("readBytes", new Long(0));
mSession.setAttribute("readBytes", new Long(0));*/
}
}
public String recvHL7Message() throws Exception {
lock.lock();
boolean timeOut = false;
try {
while (getHl7ResponseMsg() == null) {
ackNotRecvd.await(60, TimeUnit.SECONDS);
timeOut = ackNotRecvd.await(60, TimeUnit.SECONDS);
if (!mSession.isConnected()) {
throw new IOException();
}
if (!timeOut) {
break;
}
}
} catch (Exception ex) {
@ -261,4 +273,24 @@ public class TCPIpHL7Connector extends IoHandlerAdapter implements HL7Connector
public void setHl7ResponseMsg(String hl7ResponseMsg) {
this.hl7ResponseMsg = hl7ResponseMsg;
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
try{
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, I18n.msg("W9155: session exception caught :{0}", cause.getLocalizedMessage()));
}
// if the external is forcesibly diconnection the connection in middle of the process
// then cought this exception and should clean the pool.
if (session != null) {
CloseFuture future = session.close();
if(useConnectionPool){
log.log(Level.INFO, I18n.msg("W9016: session exception caught :{0} , Hence cleaing up the pool", cause.getLocalizedMessage()));
HL7BCConnectionManager.closeConnectionIfNotAvailable(this.mHl7Connection.getKey());
}else{
future = session.close();
}
}
}catch(Exception e){}
}
}// end of class

View File

@ -210,7 +210,25 @@ public class DBSchemaCreation {
}
mLog.log(Level.WARNING, I18n.msg("E0312: Unable to create database tables, which are already created"));
//throw new HL7RuntimeException(I18n.msg("E0312: Exception occured while trying to create database tables required by for persistence"), ex);
}
}finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
mLog.log(Level.SEVERE, I18n.msg("E0310: Exception occured while closing a JDBC statement"), e);
}
}
if (conn != null) {
try {
//conn.rollback();
conn.close();
} catch (SQLException e) {
mLog.log(Level.SEVERE, I18n.msg("E0311: Exception occured while closing a JDBC connection"), e);
}
}
}
}
/**
@ -226,9 +244,10 @@ public class DBSchemaCreation {
boolean allTablesPresent = false;
DBConnection dbConn = null;
ResultSet resultSet = null;
Connection conn = null;
try {
dbConn = connFac.createConnection();
Connection conn = dbConn.getUnderlyingConnection();
conn = dbConn.getUnderlyingConnection();
int tableCount = TABLES_LIST.length;
int numTablesFound = 0;
@ -270,6 +289,22 @@ public class DBSchemaCreation {
}
throw new HL7RuntimeException(I18n.msg("E0313: Exception occured while verifying that all the tables required by HL7 BC are present in the schema {0}",
targetSchema ), ex);
}
}finally {
if (dbConn != null) {
try {
dbConn.getUnderlyingConnection().close();
} catch (Exception e) {
mLog.log(Level.SEVERE, I18n.msg("E0311: Exception occured while closing a JDBC connection"), e);
}
}
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
mLog.log(Level.SEVERE, I18n.msg("E0311: Exception occured while closing a JDBC connection"), e);
}
}
}
}
}

View File

@ -86,6 +86,7 @@ public class MLLPV1PersistenceHandler {
public boolean checkDuplicateAndsendResponseFromLog(String hl7Msg) throws Exception {
DBConnection dbcon = null;
boolean found = false;
ResultSet rs = null;
try {
String messageId = digestMessage(hl7Msg);
@ -95,7 +96,7 @@ public class MLLPV1PersistenceHandler {
dbcon = getDBConnection();
hl7MsgLogDbo.setOldStatus(STATUS_HL7_ACK_SENT);
ResultSet rs = dbcon.getRowWithStatus(hl7MsgLogDbo);
rs = dbcon.getRowWithStatus(hl7MsgLogDbo);
while (rs.next()) {
hl7MsgLogDbo.populateDBO(rs);
@ -109,6 +110,14 @@ public class MLLPV1PersistenceHandler {
}
}
} catch (Exception ex) {
if(rs != null){
try{
rs.close();
} catch (SQLException e) {
logger.log(Level.SEVERE,
I18n.msg("E9335: Exception occurred while closing the result set"));
}
}
try {
dbcon.getUnderlyingConnection().rollback();
} catch (SQLException e) {
@ -119,6 +128,14 @@ public class MLLPV1PersistenceHandler {
I18n.msg("E0336: Exception occurred while checking duplicate message from HL7MessageLog "), ex);
throw ex;
} finally {
if(rs != null){
try{
rs.close();
} catch (SQLException e) {
logger.log(Level.SEVERE,
I18n.msg("E9335: Exception occurred while closing the result set"));
}
}
if (dbcon != null) {
try {
dbcon.close();

View File

@ -83,6 +83,7 @@ public class MLLPV2PersistenceHandler {
public void updateHL7MessageLogStatus(String hl7Msg,
boolean isAck) {
DBConnection dbcon = null;
ResultSet rs = null;
try {
@ -92,7 +93,7 @@ public class MLLPV2PersistenceHandler {
dbcon = getDBConnection();
hl7MsgLogDbo.setOldStatus(STATUS_NEW_MSG_RECEIVED);
ResultSet rs = dbcon.getRowWithStatus(hl7MsgLogDbo);
rs = dbcon.getRowWithStatus(hl7MsgLogDbo);
if (rs.next()) {
hl7MsgLogDbo.populateDBO(rs);
@ -108,6 +109,14 @@ public class MLLPV2PersistenceHandler {
logger.fine(I18n.msg("Updated the HL7MessageLog status to : {0} ", STATUS_COMMIT_ACK_SENT));
}
} catch (Exception ex) {
if(rs != null){
try{
rs.close();
} catch (SQLException e) {
logger.log(Level.SEVERE,
I18n.msg("E9335: Exception occurred while closing the result set"));
}
}
try {
dbcon.getUnderlyingConnection().rollback();
} catch (SQLException e) {
@ -118,6 +127,14 @@ public class MLLPV2PersistenceHandler {
I18n.msg("E0303: Exception occurred while updating HL7MessageLog status "),
ex);
} finally {
if(rs != null){
try{
rs.close();
} catch (SQLException e) {
logger.log(Level.SEVERE,
I18n.msg("E9335: Exception occurred while closing the result set"));
}
}
if (dbcon != null) {
try {
dbcon.close();

View File

@ -86,16 +86,26 @@ public class DBConnectionFactory {
if(datasourceJndiName == null || "".equals(datasourceJndiName.trim())){
meta = getAxionDBMetaData();
}else {
Connection conn = null;
try {
dataSourceInstance = (DataSource) initialContext.lookup(datasourceJndiName);
Connection conn = dataSourceInstance.getConnection();
conn = dataSourceInstance.getConnection();
meta = conn.getMetaData();
} catch (NamingException e) {
mLogger.log(Level.WARNING,I18n.msg("W0128: Could not find a JDBC resource with the JNDI name : {0} . Will be using Local DB for persistence.", datasourceJndiName));
}catch (SQLException e){
mLogger.log(Level.WARNING,I18n.msg("W0129: Could not get connection from Datasource : {0}. Will be using Local DB for persistence.", datasourceJndiName ));
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
mLogger.log(Level.SEVERE, I18n.msg("E0311: Exception occured while closing a JDBC connection"), e);
}
}
}
}
@ -130,9 +140,9 @@ public class DBConnectionFactory {
private DatabaseMetaData getAxionDBMetaData() throws HL7RuntimeException{
//AxionDBConnection axionConn = AxionDBConnectionPool.getInstance(this.installRoot).getConnection();
DBConnection axionConn = null;
try {
DBConnection axionConn = createConnection();
axionConn = createConnection();
DatabaseMetaData dbMeta = axionConn.getMetaData();
return dbMeta;
} catch (SQLException e) {
@ -140,6 +150,16 @@ public class DBConnectionFactory {
throw new HL7RuntimeException(I18n.msg("E0300: Could not get DatabaseMetaData from Axion DB ") ,e) ;
}catch (Exception e) {
throw new HL7RuntimeException(I18n.msg("E0300: Could not get DatabaseMetaData from Axion DB ") ,e);
}finally{
if (axionConn != null) {
try {
axionConn.getUnderlyingConnection().close();
} catch (SQLException e) {
mLogger.log(Level.SEVERE, I18n.msg("E0311: Exception occured while closing a JDBC connection"), e);
}
}
}
}

View File

@ -293,6 +293,18 @@ public class HL7EventHandler extends IoHandlerAdapter {
session.setAttribute(NAK_SENT_COUNTER, new Long(0));
session.setAttribute(CANNED_NAK_SENT_COUNTER, new Long(0));
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
try{
if (mLog.isLoggable(Level.INFO)) {
mLog.log(Level.INFO, I18n.msg("W9155: session exception caught :{0}", cause.getLocalizedMessage()));
}
if (session != null ) {
CloseFuture future = session.close();
}
}catch(Exception e){}
}
class DefaultHL7Callback implements HL7Callback {
@ -303,7 +315,8 @@ public class HL7EventHandler extends IoHandlerAdapter {
}
public void onReply(String hl7MsgAck, boolean isAck) throws ApplicationException, Exception {
mSession.write(hl7MsgAck);
mSession.write(hl7MsgAck);
mLog.info(I18n.msg("I8787:ACK/NAK sent back to the Client : {0}: Msg: {1}", mSession.getRemoteAddress().toString(), new String(hl7MsgAck)));
}
public void sendPersistedACK(String ackMsg){

View File

@ -28,6 +28,7 @@ public class OutboundTcpServerHL7Connector implements HL7Connector {
private String hl7ResponseMsg;
private Endpoint mEndpoint;
private OutboundTcpServerHL7ConnectorPool mOutboundServerPool;
private ProtocolInfo mProtocolInfo = null;
public OutboundTcpServerHL7Connector(OutboundTcpServerHL7ConnectorPool theOutboundServerPool) {
mLog.finer("Entering constructor");
@ -72,6 +73,14 @@ public class OutboundTcpServerHL7Connector implements HL7Connector {
return hl7ResponseMsg;
}
public ProtocolInfo getProtocolInfo() throws Exception {
return this.mProtocolInfo;
}
public void setProtocolInfo(ProtocolInfo pInfo) throws Exception {
this.mProtocolInfo = pInfo;
}
public String recvHL7Message() throws Exception {
lock.lock();
try {

View File

@ -376,13 +376,14 @@ public class HL7BCManagement implements HL7BCManagementMBean {
public long getSequenceNumber(String serviceUnit) throws MBeanException {
int seqNum = -1;
DBConnection dbConnection = null;
ResultSet rs = null;
try {
dbConnection = getDBConnection();
// queryKey is serviceUnitPath
String queryKey = getServiceUnitPath(serviceUnit);
SequenceNumDBO seqNoDBO = dbObjectFactory.createSequenceNumDBO(queryKey);
ResultSet rs = dbConnection.getRow(seqNoDBO);
rs = dbConnection.getRow(seqNoDBO);
if (rs.next()) {
seqNoDBO.populateDBO(rs);
seqNum = seqNoDBO.getESN();
@ -391,6 +392,9 @@ public class HL7BCManagement implements HL7BCManagementMBean {
throw new MBeanException(e);
} finally {
try {
if(rs != null){
rs.close();
}
if (dbConnection != null) {
dbConnection.close();
}

View File

@ -648,6 +648,7 @@ HL7BC-E0309 = Error closing the streams
HL7BC-E0310 = Exception occured while closing a JDBC statement
# com.sun.jbi.hl7bc.extservice.persist.DBSchemaCreation
# com.sun.jbi.hl7bc.extservice.persist.connection.DBConnectionFactory
HL7BC-E0311 = Exception occured while closing a JDBC connection
# com.sun.jbi.hl7bc.extservice.persist.DBSchemaCreation
@ -748,6 +749,14 @@ HL7BC-E0342 = Stopping outbound server socket pool
# com.sun.jbi.hl7bc.ServiceUnitImpl
HL7BC-E0343 = Failed to start outbound message processor for endpoint \: {0}.
# com.sun.jbi.hl7bc.OutboundMessageProcessor
HL7BC-E0344 = Did not detect MSA segment in response message. Response was\: {0}
# com.sun.jbi.hl7bc.OutboundMessageProcessor
# com.sun.jbi.hl7bc.extservice.persist.MLLPV1PersistenceHandler
# com.sun.jbi.hl7bc.extservice.persist.MLLPV2PersistenceHandler
HL7BC-E9335 = Exception occurred while closing the result set
# com.sun.jbi.hl7bc.bootstrap.InstallerExt
HL7BC-I0100 = setThreads()\: {0} threads.
@ -1007,6 +1016,15 @@ HL7BC-I0185 = Closing connection to {0}, elapsed time since last use \= {1}, ma
# com.sun.jbi.hl7bc.connection.HL7BCConnectionManager
HL7BC-I0186 = Opened new connection to {0} and added to connection pool. Max idle time is {1}
# com.sun.jbi.hl7bc.InboundMessageProcessor
HL7BC-I9124 = Received HL7 message from the client address {0} \: on to the server port {1}.
# com.sun.jbi.hl7bc.OutboundMessageProcessor
HL7BC-I9789 = HL7 Message sent to the external system, Address, Host\:{0} , Port\: {1}
# com.sun.jbi.hl7bc.OutboundMessageProcessor
HL7BC-I9790 = ACK/NAK received from the external system, Address, Host\:{0} , Port\: {1}
# com.sun.jbi.hl7bc.configuration.RuntimeConfiguration
HL7BC-W0100 = Failed to persist Application Configuration.
@ -1122,3 +1140,10 @@ HL7BC-W0142 = Exception occured while commiting the transaction {0}
# com.sun.jbi.hl7bc.InboundMessageProcessor
HL7BC-W0143 = Exception occured while storing the HL7Messageand ACK into the DB {0}
# com.sun.jbi.hl7bc.extservice.client.TCPIpHL7Connector
HL7BC-W9016 = session exception caught \:{0} , Hence cleaing up the pool
# com.sun.jbi.hl7bc.extservice.client.TCPIpHL7Connector
# com.sun.jbi.hl7bc.extservice.server.HL7EventHandler
HL7BC-W9155 = session exception caught \:{0}

View File

@ -42,7 +42,10 @@ import com.sun.jbi.jmsbc.LogSupport;
*/
public class InstallerExt implements InstallerExtMBean {
String mThreads;
String mThreads;
String mForceConcurrencyMode;
int mForceMaxConcurrentConsumers;
String mDefaultRedeliveryHandling;
private static final Messages mMessages =
Messages.getMessages(InstallerExt.class);
@ -82,5 +85,66 @@ public class InstallerExt implements InstallerExtMBean {
new Object[]{"Threads", oldVal, val});
}
}
public void setForceConcurrencyMode(String val){
String oldVal = mForceConcurrencyMode;
mForceConcurrencyMode = val;
if (mLogger.isLoggable(Level.CONFIG)) {
mLogger.log(Level.CONFIG,
"JMSBC-C0301.AttributeChanged",
new Object[]{"ForceConcurrencyMode", oldVal, val});
}
}
public String getForceConcurrencyMode() {
if (mLogger.isLoggable(LogSupport.LEVEL_DEBUG)) {
mLogger.log(LogSupport.LEVEL_DEBUG,
"InstallerExt_GETFORCECONCURRENCYMODE_CALLED",
new Object[]{mForceConcurrencyMode});
}
return mForceConcurrencyMode;
}
public int getForceMaxConcurrentConsumers(){
if (mLogger.isLoggable(LogSupport.LEVEL_DEBUG)) {
mLogger.log(LogSupport.LEVEL_DEBUG,
"InstallerExt_GETFORCEMAXCONCURRENTCONSUMERS",
new Object[]{mForceMaxConcurrentConsumers});
}
return mForceMaxConcurrentConsumers;
}
public void setForceMaxConcurrentConsumers(int val){
int oldVal = mForceMaxConcurrentConsumers;
mForceMaxConcurrentConsumers = val;
if (mLogger.isLoggable(Level.CONFIG)) {
mLogger.log(Level.CONFIG,
"JMSBC-C0301.AttributeChanged",
new Object[]{"ForceMaxConcurrentConsumers", oldVal, val});
}
}
public String getDefaultRedeliveryHandling(){
if (mLogger.isLoggable(LogSupport.LEVEL_DEBUG)) {
mLogger.log(LogSupport.LEVEL_DEBUG,
"InstallerExt_GETDEFAULTREDELIVERYHANDLING",
new Object[]{mDefaultRedeliveryHandling});
}
return mDefaultRedeliveryHandling;
}
public void setDefaultRedeliveryHandling(String val){
String oldVal = mDefaultRedeliveryHandling;
mDefaultRedeliveryHandling = val;
if (mLogger.isLoggable(Level.CONFIG)) {
mLogger.log(Level.CONFIG,
"JMSBC-C0301.AttributeChanged",
new Object[]{"DefaultRedeliveryHandling", oldVal, val});
}
}
}

View File

@ -37,4 +37,13 @@ package com.sun.jbi.jmsbc.mbeans;
public interface InstallerExtMBean {
public String getThreads();
public void setThreads(String val);
public String getForceConcurrencyMode();
public void setForceConcurrencyMode(String val);
public int getForceMaxConcurrentConsumers();
public void setForceMaxConcurrentConsumers(int val);
public String getDefaultRedeliveryHandling();
public void setDefaultRedeliveryHandling(String val);
}

View File

@ -172,6 +172,10 @@ public class JMSBCRuntimeConfiguration extends RuntimeConfiguration implements J
msgBuf.append(mConfig.getProperty(CONFIG_THREADS, DEFAULT_THREADS));
msgBuf.append(", DefaultRedeliveryHandling=");
msgBuf.append(mConfig.getProperty(CONFIG_DEFAULT_REDELIVERY, ""));
msgBuf.append(", ForceConcurrentMode=");
msgBuf.append(mConfig.getProperty(CONFIG_FORCE_CONCURRENCY_MODE, ""));
msgBuf.append(", ForceMaxConcurrentConsumers=");
msgBuf.append(mConfig.getProperty(CONFIG_FORCE_MAX_CONCURRENT_CONSUMERS, ""));
msgBuf.append(" }\n");
super.dump(msgBuf);
}
@ -271,6 +275,7 @@ public class JMSBCRuntimeConfiguration extends RuntimeConfiguration implements J
}
public void setForceConcurrencyMode(String val) throws InvalidAttributeValueException, MBeanException {
mLogger.finest("setting ForceConcurrencyMode");
String attrName = CONFIG_FORCE_CONCURRENCY_MODE;
String oldVal = getForceConcurrencyMode();
@ -286,8 +291,9 @@ public class JMSBCRuntimeConfiguration extends RuntimeConfiguration implements J
// Apply and save the changes
mConfig.put(attrName, val);
mLogger.finest("persisting ForceConcurrencyMode");
persistConfiguration();
mLogger.finest("persisting of ForceConcurrencyMode completed");
if (mLogger.isLoggable(Level.CONFIG)) {
mLogger.log(Level.CONFIG, "JMSBC-C0301.AttributeChangedDetail",
new Object[] { attrName, oldVal, val });
@ -306,6 +312,7 @@ public class JMSBCRuntimeConfiguration extends RuntimeConfiguration implements J
attrType,
oldVal,
val);
mLogger.finest("sending notification for ForceConcurrencyMode");
broadcasterSupport.sendNotification(notif);
}

View File

@ -77,3 +77,6 @@ APPLICATION_VARIABLES_TYPE_FIELD=Variable Type
# ======
#
InstallerExt_GETTHREADS_CALLED=getThreads has been called, the number of threads returned is {0}
InstallerExt_GETFORCECONCURRENCYMODE_CALLED=getForceConcurrencyMode has been called, the value returned is {0}
InstallerExt_GETFORCEMAXCONCURRENTCONSUMERS=getForceMaxConcurrentConsumers has been called, the value returned is {0}
InstallerExt_GETDEFAULTREDELIVERYHANDLING=getDefaultRedeliveryHandling has been called, the value returned is {0}

View File

@ -7,5 +7,17 @@
<componentConfig:Threads displayName="Number of service provider threads"
displayDescription="Number of threads to concurrently process outbound JMS requests and process message exchange repsonses"
isPasswordField="false">10</componentConfig:Threads>
<componentConfig:DefaultRedeliveryHandling displayName="Default Redelivery Handling"
displayDescription="Default redelivery handling for when no value is supplied in WSDL"
isPasswordField="false">10</componentConfig:DefaultRedeliveryHandling>
<componentConfig:ForceConcurrencyMode displayName="Force Concurrency Mode"
displayDescription="Force a concurrency mode which will always override the value specified in WSDL. Use this field if all service units deployed to this BC should use the same settings in order to prevent accidentally omitting a setting."
isPasswordField="false">10</componentConfig:ForceConcurrencyMode>
<componentConfig:ForceMaxConcurrentConsumers displayName="Force Max Concurrent Consumers"
displayDescription="Force the maximum number of concurrent consumers to a value which will always override the value specified in WSDL. Use this field if all service units deployed to this BC should use the same settings in order to prevent accidentally omitting a setting. The default value of -1 means that no forced value is present and the value in the WSDL should always be used."
isPasswordField="false">10</componentConfig:ForceMaxConcurrentConsumers>
</componentConfig:Configuration>

View File

@ -11,6 +11,12 @@
<xsd:complexType name="ConfigurationType">
<xsd:sequence>
<xsd:element type="tns:ThreadCountType" name="Threads">
</xsd:element>
<xsd:element type="tns:DefaultDeliveryHandlingType" name="DefaultDeliveryHandling">
</xsd:element>
<xsd:element type="tns:ForceConcurrencyModeType" name="ForceConcurrencyMode">
</xsd:element>
<xsd:element type="tns:ForceMaxConcurrentConsumersType" name="ForceMaxConcurrentConsumers">
</xsd:element>
</xsd:sequence>
<xsd:attribute type="xsd:string" name="name"/>
@ -32,5 +38,34 @@
<xsd:maxInclusive value="2147483647"/>
</xsd:restriction>
</xsd:simpleType>
<xsd:complexType name="DefaultDeliveryHandlingType">
<xsd:simpleContent>
<xsd:extension base="tns:SimpleRestrictedThreadType">
<xsd:attribute type="xsd:string" name="displayName"/>
<xsd:attribute type="xsd:string" name="displayDescription"/>
<xsd:attribute type="xsd:boolean" name="isPasswordField"/>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="ForceConcurrencyModeType">
<xsd:simpleContent>
<xsd:extension base="tns:SimpleRestrictedThreadType">
<xsd:attribute type="xsd:string" name="displayName"/>
<xsd:attribute type="xsd:string" name="displayDescription"/>
<xsd:attribute type="xsd:boolean" name="isPasswordField"/>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
<xsd:complexType name="ForceMaxConcurrentConsumersType">
<xsd:simpleContent>
<xsd:extension base="tns:SimpleRestrictedThreadType">
<xsd:attribute type="xsd:string" name="displayName"/>
<xsd:attribute type="xsd:string" name="displayDescription"/>
<xsd:attribute type="xsd:boolean" name="isPasswordField"/>
</xsd:extension>
</xsd:simpleContent>
</xsd:complexType>
</xsd:schema>

View File

@ -57,7 +57,7 @@
<config:Property name="ForceConcurrencyMode"
type="xsd:string"
displayName="Force Concurrency Mode"
displayDescription="Force a concurrency mode which will always override the value specified in WSDL. Use this field if all service units deployed to this BC should use the same settings in order to prevent accidentally omitting a setting."
displayDescription="Force a concurrency mode which will always override the value specified in WSDL. Use this field if all service units deployed to this BC should use the same settings in order to prevent accidentally omitting a setting. Valid values are 'cc', 'sync','serial'"
defaultValue=""
showDisplay="all"
isApplicationRestartRequired="true"

View File

@ -185,12 +185,12 @@
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>0.9.9-6</version>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>0.9.9-6</version>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>net.sf.hulp.meas</groupId>

View File

@ -24,8 +24,8 @@ public class JsonUtil {
private final static Logger logger = Logger.getLogger(JsonUtil.class.getName());
private final static ObjectMapper mapper = new ObjectMapper();
private final static JavaType mapType = MapType.typed(HashMap.class, TypeFactory.fromClass(String.class), TypeFactory.fromClass(String.class));
private final static JavaType listType = CollectionType.typed(ArrayList.class, TypeFactory.fromClass(String.class));
private final static JavaType mapType = MapType.construct(HashMap.class, TypeFactory.defaultInstance().constructType(String.class), TypeFactory.defaultInstance().constructType(String.class));
private final static JavaType listType = CollectionType.construct(ArrayList.class, TypeFactory.defaultInstance().constructType(String.class));
public static Map<String, String> parseJsonPairs(String s) {
String input = s;

View File

@ -281,13 +281,13 @@
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<optional>true</optional>
<version>0.9.9-6</version>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<optional>true</optional>
<version>0.9.9-6</version>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>net.sf.hulp.meas</groupId>

View File

@ -13,7 +13,7 @@
<identification>
<name>sun-rest-binding</name>
<description>REST Binding Component</description>
<identification:VersionInfo component-version="${restbc.release.version}" build-number="${BUILD_NUMBER}"/>
<identification:VersionInfo component-version="${restbc.release.version}" build-number="20121106"/>
</identification>
<component-class-name description="REST Binding Component">com.sun.jbi.restbc.jbiadapter.RestComponent</component-class-name>
<component-class-path>