From 602aa16f34b70425cbba55d6312f5afe8d67542e Mon Sep 17 00:00:00 2001 From: David BRASSELY Date: Thu, 30 Oct 2014 12:12:45 +0100 Subject: [PATCH] ESBCOMP-117 Closing MessageExchange when component is shutting-down --- .../openesb/pojose/jbi/msgs.properties | 3 + .../pojose/jbi/thread/InboundProcessor.java | 470 +++++++++--------- 2 files changed, 242 insertions(+), 231 deletions(-) diff --git a/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/msgs.properties b/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/msgs.properties index 7e99a51d0..d0e5a4007 100755 --- a/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/msgs.properties +++ b/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/msgs.properties @@ -266,3 +266,6 @@ POJOSE-7539 = Failed to load application variables {0} {1} # org.glassfish.openesb.pojose.jbi.PojoSEConfiguration POJOSE-7540 = Failed to persist application variables {0} {1} +# org.glassfish.openesb.pojose.jbi.thread.InboundProcessor +POJOSE-7541 = Failed to close MessagingChannel {0}. + diff --git a/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/thread/InboundProcessor.java b/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/thread/InboundProcessor.java index af854029d..672829255 100755 --- a/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/thread/InboundProcessor.java +++ b/ojc-core/pojose/jbiadapter/src/org/glassfish/openesb/pojose/jbi/thread/InboundProcessor.java @@ -27,7 +27,6 @@ * * END_HEADER - DO NOT EDIT */ - package org.glassfish.openesb.pojose.jbi.thread; import javax.management.Notification; @@ -58,14 +57,15 @@ import org.glassfish.openesb.pojose.jbi.su.MasterTracker; * * @author gpatil */ -public class InboundProcessor extends Thread implements MessageProcessor, - NotificationListener{ +public class InboundProcessor extends Thread implements MessageProcessor, + NotificationListener { + private POJOComponentContext compCtx; private MessagingChannel msgChannel; private final POJOSEThreadPoolExecutor mcProcessorPool; private final int defKeepAliveTime = 5; // 5 min private final long defWaitTime = 3 * 1000; //3 secs - private final long defPoolTerminateWaitTime = 60 * 1 ; // 1 Min + private final long defPoolTerminateWaitTime = 60 * 1; // 1 Min private BlockingQueue bq = null; private Logger logger = Logger.getLogger(InboundProcessor.class.getName()); private volatile boolean started = false; @@ -73,271 +73,279 @@ public class InboundProcessor extends Thread implements MessageProcessor, private final MasterTracker mt = MasterTracker.getInstance(); private final ConcurrentLinkedQueue bufferQueue = new ConcurrentLinkedQueue(); private volatile int maxPoolSize = PojoSEConfigurationMBean.DEFAULT_MAX_THREAD_POOL_SIZE; - + private static volatile InboundProcessor instance; - - public static synchronized InboundProcessor getInstance(POJOComponentContext cc){ - if (instance == null){ - instance = new InboundProcessor(cc); - } - return instance; + + public static synchronized InboundProcessor getInstance(POJOComponentContext cc) { + if (instance == null) { + instance = new InboundProcessor(cc); + } + return instance; } - + // Private constructor to make sure atmost only one instance is present at any moment private InboundProcessor(POJOComponentContext cc) { - this.compCtx = cc; - - int corePool = PojoSEConfigurationMBean.DEFAULT_CORE_THREAD_POOL_SIZE; - this.maxPoolSize = PojoSEConfigurationMBean.DEFAULT_MAX_THREAD_POOL_SIZE; - int queueSize = PojoSEConfigurationMBean.DEFAULT_THREAD_POOL_BLOCKING_QUEUE_SIZE; - - PojoSEConfigurationMBean mbean = this.compCtx.getConfigMbean(); - - if (mbean != null){ - corePool = mbean.getCoreThreadPoolSize(); - maxPoolSize = mbean.getMaxThreadPoolSize(); - queueSize = mbean.getThreadPoolBlockingQueueSize(); - } - - this.bq = new ArrayBlockingQueue(queueSize); + this.compCtx = cc; - this.mcProcessorPool = new POJOSEThreadPoolExecutor(corePool, - maxPoolSize, defKeepAliveTime, TimeUnit.SECONDS, - bq, EngineThreadFactory.getFactory(), - new POJOSERejectedExecutionHandler(this.bufferQueue)); - - //Set pool size to core pool size. - DocumentBuilderPool.getInstance().setMaxPoolSize(corePool); - TransformerPool.getInstance().setMaxPoolSize(corePool); - JBIPartExprPool.getInstance().setMaxPoolSize(corePool); - - //this.logger = cc.getCompLifeCycle().getCompLifeCycleLogger(); - this.setName("POJO SE DC Listener");//NOI18N + int corePool = PojoSEConfigurationMBean.DEFAULT_CORE_THREAD_POOL_SIZE; + this.maxPoolSize = PojoSEConfigurationMBean.DEFAULT_MAX_THREAD_POOL_SIZE; + int queueSize = PojoSEConfigurationMBean.DEFAULT_THREAD_POOL_BLOCKING_QUEUE_SIZE; + + PojoSEConfigurationMBean mbean = this.compCtx.getConfigMbean(); + + if (mbean != null) { + corePool = mbean.getCoreThreadPoolSize(); + maxPoolSize = mbean.getMaxThreadPoolSize(); + queueSize = mbean.getThreadPoolBlockingQueueSize(); + } + + this.bq = new ArrayBlockingQueue(queueSize); + + this.mcProcessorPool = new POJOSEThreadPoolExecutor(corePool, + maxPoolSize, defKeepAliveTime, TimeUnit.SECONDS, + bq, EngineThreadFactory.getFactory(), + new POJOSERejectedExecutionHandler(this.bufferQueue)); + + //Set pool size to core pool size. + DocumentBuilderPool.getInstance().setMaxPoolSize(corePool); + TransformerPool.getInstance().setMaxPoolSize(corePool); + JBIPartExprPool.getInstance().setMaxPoolSize(corePool); + + //this.logger = cc.getCompLifeCycle().getCompLifeCycleLogger(); + this.setName("POJO SE DC Listener");//NOI18N } - private int parseInt(Object val){ - int ret = -1; - try { - ret = Integer.parseInt(val.toString()); - } catch (Exception ex){ - ret = -1; - } - return ret; + private int parseInt(Object val) { + int ret = -1; + try { + ret = Integer.parseInt(val.toString()); + } catch (Exception ex) { + ret = -1; + } + return ret; } public void handleNotification(Notification ntf, Object handback) { - if ((ntf != null) && - (AttributeChangeNotification.ATTRIBUTE_CHANGE.equals(ntf.getType()))){ - AttributeChangeNotification acn = (AttributeChangeNotification) ntf; - if (PojoSEConfigurationMBean.CORE_THREAD_POOL_SIZE.equals( - acn.getAttributeName())){ - int size = parseInt(acn.getNewValue()); - if (size != -1){ - this.mcProcessorPool.setCorePoolSize(size); - } - }else if (PojoSEConfigurationMBean.MAX_THREAD_POOL_SIZE.equals( - acn.getAttributeName())){ - int size = parseInt(acn.getNewValue()); - if (size != -1){ - this.maxPoolSize = size; - this.mcProcessorPool.setMaximumPoolSize(size); - } - } - } + if ((ntf != null) + && (AttributeChangeNotification.ATTRIBUTE_CHANGE.equals(ntf.getType()))) { + AttributeChangeNotification acn = (AttributeChangeNotification) ntf; + if (PojoSEConfigurationMBean.CORE_THREAD_POOL_SIZE.equals( + acn.getAttributeName())) { + int size = parseInt(acn.getNewValue()); + if (size != -1) { + this.mcProcessorPool.setCorePoolSize(size); + } + } else if (PojoSEConfigurationMBean.MAX_THREAD_POOL_SIZE.equals( + acn.getAttributeName())) { + int size = parseInt(acn.getNewValue()); + if (size != -1) { + this.maxPoolSize = size; + this.mcProcessorPool.setMaximumPoolSize(size); + } + } + } } - + @Override public void run() { - MessageExchange me = null; - try { - this.msgChannel = compCtx.getDC(); - // Loop forever accepting incoming MessagesExchanges, including POJO SE - // responses to the POJO SE initiated MEs. Util stop is signalled. - for (;;) { - try { - this.startedLock.lock(); - if (!this.started){ - break; - } - } finally { - this.startedLock.unlock(); - } + MessageExchange me = null; + try { + this.msgChannel = compCtx.getDC(); + // Loop forever accepting incoming MessagesExchanges, including POJO SE + // responses to the POJO SE initiated MEs. Util stop is signalled. + for (;;) { + try { + this.startedLock.lock(); + if (!this.started) { + break; + } + } finally { + this.startedLock.unlock(); + } - try { - // accept while we have still lock on startLock instead here? - me = this.msgChannel.accept(defWaitTime); + try { + // accept while we have still lock on startLock instead here? + me = this.msgChannel.accept(defWaitTime); - if (me != null){ - if (logger.isLoggable(Level.FINEST)){ - logger.finest("Accepted ME:" + me.getExchangeId()); - } - executeME(me); - } + if (me != null) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("Accepted ME:" + me.getExchangeId()); + } + executeME(me); + } - if (this.bufferQueue.size() > 0){ - checkAndExecuteBufferQueueTasks(); - } - }catch (Exception ex) { - String m = I18n.loc("POJOSE-7522: Exception while executing for ME {0}. {1}", me, ex); - logger.log(Level.SEVERE, m, ex); - } - } - } catch (Exception ex) { - String m = I18n.loc("POJOSE-7520: Exception while accepting ME from delivery channel. {0}", ex); - logger.log(Level.SEVERE, m, ex); - } + if (this.bufferQueue.size() > 0) { + checkAndExecuteBufferQueueTasks(); + } + } catch (Exception ex) { + String m = I18n.loc("POJOSE-7522: Exception while executing for ME {0}. {1}", me, ex); + logger.log(Level.SEVERE, m, ex); + } + } + } catch (Exception ex) { + String m = I18n.loc("POJOSE-7520: Exception while accepting ME from delivery channel. {0}", ex); + logger.log(Level.SEVERE, m, ex); + } - if (logger.isLoggable(Level.FINE)) { - String m = I18n.loc("POJOSE-1508: Stopped listening to Delivery Channel.");//NOI18N - logger.log(Level.FINE, m); - } + if (logger.isLoggable(Level.FINE)) { + String m = I18n.loc("POJOSE-1508: Stopped listening to Delivery Channel.");//NOI18N + logger.log(Level.FINE, m); + } } // Put the ME/InMEProcessor into our ThreadPoolExecutor queue // and return quickly. - private void executeME(MessageExchange me){ - try { - if (this.mcProcessorPool.getActualActiveExecutorTaskCount() >= this.maxPoolSize){ // All thread are busy or blocked. - if (this.mt.getActiveSynchCalls() >= this.maxPoolSize){ // All are blocked by sendSynch - // if role is provider and no active call for the service exists, then run using grace thread. - if (Role.PROVIDER.equals(me.getRole())){ - InMsgTask it = new InMsgTask(compCtx, me); - if (this.mt.getActiveInRequestForPojo(it.getPojoServiceClassName()) == 0){ - //Run in grace thread - String m = I18n.loc("POJOSE-6506: Possible deadlock condition detected, allocated threads and blocking queue are full. Executing incoming request in a \"grace\" thread. ME id:{0}", - me.getExchangeId()); - logger.warning(m); - EngineThreadFactory.getFactory().newGraceThread(it).start(); - } else { - addToBufferQueue(it); - } - } else { - // handled by Response message processor. - RespMsgTask rt = new RespMsgTask(compCtx, me); - addToBufferQueue(rt); - } - } else { - if (Role.PROVIDER.equals(me.getRole())){ - InMsgTask it = new InMsgTask(compCtx, me); - addToBufferQueue(it); - } else { - // handled by Response message processor. - RespMsgTask rt = new RespMsgTask(compCtx, me); - addToBufferQueue(rt); - } - } - } else { - ServiceEndpoint sept = me.getEndpoint(); - if (logger.isLoggable(Level.FINE)) { - String msg = I18n.lf("POJOSE-1509: Processing for Endpoint:{0}", sept);//NOI18N - logger.fine(msg); - } + private void executeME(MessageExchange me) { + try { + if (this.mcProcessorPool.getActualActiveExecutorTaskCount() >= this.maxPoolSize) { // All thread are busy or blocked. + if (this.mt.getActiveSynchCalls() >= this.maxPoolSize) { // All are blocked by sendSynch + // if role is provider and no active call for the service exists, then run using grace thread. + if (Role.PROVIDER.equals(me.getRole())) { + InMsgTask it = new InMsgTask(compCtx, me); + if (this.mt.getActiveInRequestForPojo(it.getPojoServiceClassName()) == 0) { + //Run in grace thread + String m = I18n.loc("POJOSE-6506: Possible deadlock condition detected, allocated threads and blocking queue are full. Executing incoming request in a \"grace\" thread. ME id:{0}", + me.getExchangeId()); + logger.warning(m); + EngineThreadFactory.getFactory().newGraceThread(it).start(); + } else { + addToBufferQueue(it); + } + } else { + // handled by Response message processor. + RespMsgTask rt = new RespMsgTask(compCtx, me); + addToBufferQueue(rt); + } + } else { + if (Role.PROVIDER.equals(me.getRole())) { + InMsgTask it = new InMsgTask(compCtx, me); + addToBufferQueue(it); + } else { + // handled by Response message processor. + RespMsgTask rt = new RespMsgTask(compCtx, me); + addToBufferQueue(rt); + } + } + } else { + ServiceEndpoint sept = me.getEndpoint(); + if (logger.isLoggable(Level.FINE)) { + String msg = I18n.lf("POJOSE-1509: Processing for Endpoint:{0}", sept);//NOI18N + logger.fine(msg); + } - if (Role.PROVIDER.equals(me.getRole())){ - mcProcessorPool.execute(new InMsgTask(compCtx, me)); - } else { - // handled by Response message processor. - mcProcessorPool.execute(new RespMsgTask(compCtx, me)); - } - } - } catch (RejectedExecutionException ree){ // TODO remove, control should not come here. - String m = I18n.loc("POJOSE-7517: Allocated threads and blocking queue are full. Please configure to allocate higher resources: Core and Max pool size and blocking queue size of ThreadPoolExecutor."); - logger.severe(m); - Exception ex = new Exception(m, ree); - try { - me.setError(ex); - this.msgChannel.send(me); - } catch (Throwable th){ - m = I18n.loc("POJOSE-7500: Exception while sending error status {0}.", th); - logger.severe(m); - } - } + if (Role.PROVIDER.equals(me.getRole())) { + mcProcessorPool.execute(new InMsgTask(compCtx, me)); + } else { + // handled by Response message processor. + mcProcessorPool.execute(new RespMsgTask(compCtx, me)); + } + } + } catch (RejectedExecutionException ree) { // TODO remove, control should not come here. + String m = I18n.loc("POJOSE-7517: Allocated threads and blocking queue are full. Please configure to allocate higher resources: Core and Max pool size and blocking queue size of ThreadPoolExecutor."); + logger.severe(m); + Exception ex = new Exception(m, ree); + try { + me.setError(ex); + this.msgChannel.send(me); + } catch (Throwable th) { + m = I18n.loc("POJOSE-7500: Exception while sending error status {0}.", th); + logger.severe(m); + } + } } - private void addToBufferQueue(BaseTask task){ - String m = I18n.loc("POJOSE-6504: Allocated threads and blocking queue are full. Adding accepted ME to buffer task queue."); - logger.warning(m); - bufferQueue.add(task); + private void addToBufferQueue(BaseTask task) { + String m = I18n.loc("POJOSE-6504: Allocated threads and blocking queue are full. Adding accepted ME to buffer task queue."); + logger.warning(m); + bufferQueue.add(task); } - private void submit2Executor(BaseTask task){ - MessageExchange me = task.getTaskTriggerME(); - ServiceEndpoint sept = me.getEndpoint(); - if (logger.isLoggable(Level.FINE)) { - String msg = I18n.lf("POJOSE-1509: Processing for Endpoint:{0}", sept);//NOI18N - logger.fine(msg); - } + private void submit2Executor(BaseTask task) { + MessageExchange me = task.getTaskTriggerME(); + ServiceEndpoint sept = me.getEndpoint(); + if (logger.isLoggable(Level.FINE)) { + String msg = I18n.lf("POJOSE-1509: Processing for Endpoint:{0}", sept);//NOI18N + logger.fine(msg); + } - if (Role.PROVIDER.equals(me.getRole())){ - mcProcessorPool.execute(new InMsgTask(compCtx, me)); - } else { - // handled by Response message processor. - mcProcessorPool.execute(new RespMsgTask(compCtx, me)); - } + if (Role.PROVIDER.equals(me.getRole())) { + mcProcessorPool.execute(new InMsgTask(compCtx, me)); + } else { + // handled by Response message processor. + mcProcessorPool.execute(new RespMsgTask(compCtx, me)); + } } //BufferQueue size is > 0 when called. - private void checkAndExecuteBufferQueueTasks(){ - int freeThreads = this.maxPoolSize - (int) this.mcProcessorPool.getActualActiveExecutorTaskCount(); - if (freeThreads > 0){ - BaseTask bt = null; - for (int i =0; i < freeThreads; i++){ - bt = this.bufferQueue.poll(); - if (bt != null){ - submit2Executor(bt); - } else { - break; - } - } - } + private void checkAndExecuteBufferQueueTasks() { + int freeThreads = this.maxPoolSize - (int) this.mcProcessorPool.getActualActiveExecutorTaskCount(); + if (freeThreads > 0) { + BaseTask bt = null; + for (int i = 0; i < freeThreads; i++) { + bt = this.bufferQueue.poll(); + if (bt != null) { + submit2Executor(bt); + } else { + break; + } + } + } } - + public void startProcessor() { - try { - this.startedLock.lock(); - if (!this.started){ - this.started = true; - this.start(); - } - } finally { - this.startedLock.unlock(); - } + try { + this.startedLock.lock(); + if (!this.started) { + this.started = true; + this.start(); + } + } finally { + this.startedLock.unlock(); + } } public void stopProcessor(boolean now) throws InterruptedException { - if (logger.isLoggable(Level.FINEST)){ - try { - logger.finest("DC.toString():" + this.compCtx.getJBIComponentContext().getDeliveryChannel().toString()); //NOI18N - } catch (Throwable t){ - //ignore - } - } - boolean shuttingDown = true; - try { - this.startedLock.lock(); - if (this.started){ - this.started = false; - if (now){ - this.mcProcessorPool.shutdownNow(); - }else{ - this.mcProcessorPool.shutdown(); - } - } else { - shuttingDown = false; - } - } finally { - this.startedLock.unlock(); - } + if (logger.isLoggable(Level.FINEST)) { + try { + logger.finest("DC.toString():" + this.compCtx.getJBIComponentContext().getDeliveryChannel().toString()); //NOI18N + } catch (Throwable t) { + //ignore + } + } + boolean shuttingDown = true; + try { + this.startedLock.lock(); + if (this.started) { + this.started = false; + if (now) { + this.mcProcessorPool.shutdownNow(); + } else { + this.mcProcessorPool.shutdown(); + } + if (msgChannel != null) { + try { + msgChannel.close(); + } catch (Exception ex) { + String m = I18n.loc("POJOSE-7541: Failed to close MessagingChannel {0}.", ex); + logger.log(Level.SEVERE, m, ex); + } + } + } else { + shuttingDown = false; + } + } finally { + this.startedLock.unlock(); + } - if (shuttingDown){ - if ((!this.mcProcessorPool.isTerminated()) && (this.mcProcessorPool.isTerminating())){ - this.mcProcessorPool.awaitTermination(defPoolTerminateWaitTime, - TimeUnit.SECONDS); - } - } + if (shuttingDown) { + if ((!this.mcProcessorPool.isTerminated()) && (this.mcProcessorPool.isTerminating())) { + this.mcProcessorPool.awaitTermination(defPoolTerminateWaitTime, + TimeUnit.SECONDS); + } + } } public void stopProcessor() throws InterruptedException { - stopProcessor(false); + stopProcessor(false); } }