ESBCOMP-95 Change the worker thread pool to use all the thread in the pool in place of putting message in the blockingqueue. Moving message in the queue should only be done if no more thread can handle the message.

master
David BRASSELY 2014-03-10 17:09:40 +01:00
parent cf2cb10b80
commit d9ef2a2ce1
3 changed files with 181 additions and 61 deletions

View File

@ -3,71 +3,79 @@ package com.sun.jbi.jmsbc.util;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
final public class Executor
{
final public class Executor {
private ThreadPoolExecutor mPool;
private ThreadPoolExecutor mDonePool;
private static Executor ref = new Executor();
private Executor()
{
private Executor() {
}
public static Executor getReference()
{
public static Executor getReference() {
return ref;
}
public void execute(Runnable task)
{
public void execute(Runnable task) {
mPool.execute(task);
}
public void executeDoneOrError(Runnable task)
{
mDonePool.execute(task);
public void executeDoneOrError(Runnable task) {
mDonePool.execute(task);
}
public void setMaxThreads(int i)
{
setPoolSize(mPool ,i);
setPoolSize(mDonePool ,i);
public void setMaxThreads(int i) {
setPoolSize(mPool, i);
setPoolSize(mDonePool, i);
}
public void start()
{
final BlockingQueue<Runnable> queue1 = BlockingQueueFactory.getBlockedQueue(20);
mPool = new ThreadPoolExecutor(5, 16, 10, TimeUnit.SECONDS, queue1,
new ThreadFactoryImpl("JMSBC"),
new RejectedExecutionHandlerImpl(queue1));
/*
public void start() {
final BlockingQueue<Runnable> queue1 = BlockingQueueFactory.getBlockedQueue(20);
mPool = new ThreadPoolExecutor(5, 16, 10, TimeUnit.SECONDS, queue1,
new ThreadFactoryImpl("JMSBC"),
new RejectedExecutionHandlerImpl(queue1));
final BlockingQueue<Runnable> queue2 = BlockingQueueFactory.getBlockedQueue(20);
mDonePool = new ThreadPoolExecutor(5, 16, 10, TimeUnit.SECONDS, queue2,
new ThreadFactoryImpl("JMSBC-DONE-WAIT"),
new RejectedExecutionHandlerImpl(queue2));
final BlockingQueue<Runnable> queue2 = BlockingQueueFactory.getBlockedQueue(20);
mDonePool = new ThreadPoolExecutor(5, 16, 10, TimeUnit.SECONDS, queue2,
new ThreadFactoryImpl("JMSBC-DONE-WAIT"),
new RejectedExecutionHandlerImpl(queue2));
}
*/
public void start() {
mPool = newScalingThreadPool(5, 16, 10, TimeUnit.SECONDS, new ScalingQueue<Runnable>(20), new ThreadFactoryImpl("JMSBC"));
mDonePool = newScalingThreadPool(5, 16, 10, TimeUnit.SECONDS, new ScalingQueue<Runnable>(20), new ThreadFactoryImpl("JMSBC-DONE-WAIT"));
}
public void shutdown()
{
public ThreadPoolExecutor newScalingThreadPool(int min, int max,
long keepAliveTime, TimeUnit timeUnit, ScalingQueue queue, ThreadFactory threadFactory) {
ThreadPoolExecutor tpExecutor = new ScalingThreadPoolExecutor(min, max, keepAliveTime, timeUnit, threadFactory, queue);
tpExecutor.setRejectedExecutionHandler(new ForceQueuePolicy());
queue.setThreadPoolExecutor(tpExecutor);
return tpExecutor;
}
public void shutdown() {
shutdownPool(mPool);
shutdownPool(mDonePool);
}
private void shutdownPool(ThreadPoolExecutor pool) {
pool.shutdown();
private void shutdownPool(ThreadPoolExecutor pool) {
pool.shutdown();
pool = null;
}
}
private void setPoolSize(ThreadPoolExecutor pool, int i) {
if(pool.getCorePoolSize() > i){
pool.setMaximumPoolSize(pool.getCorePoolSize());
} else{
pool.setMaximumPoolSize(i);
}
}
private void setPoolSize(ThreadPoolExecutor pool, int i) {
if (pool.getCorePoolSize() > i) {
pool.setMaximumPoolSize(pool.getCorePoolSize());
} else {
pool.setMaximumPoolSize(i);
}
}
static class ThreadFactoryImpl implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
@ -75,37 +83,54 @@ final public class Executor
ThreadFactoryImpl(String prefix) {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = prefix + "-pool-" +
poolNumber.getAndIncrement() +
"-thread-";
group = (s != null) ? s.getThreadGroup()
: Thread.currentThread().getThreadGroup();
namePrefix = prefix + "-pool-"
+ poolNumber.getAndIncrement()
+ "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler{
BlockingQueue<Runnable> mQueue;
public RejectedExecutionHandlerImpl(BlockingQueue<Runnable> queue){
this.mQueue = queue;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// add this to the queue. would be picked up by the next
// available thread.
// This way task would be never rejected.
mQueue.add(r);
}
public class ForceQueuePolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
//should never happen since we never wait
throw new RejectedExecutionException(e);
}
}
}
/*
static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
BlockingQueue<Runnable> mQueue;
public RejectedExecutionHandlerImpl(BlockingQueue<Runnable> queue) {
this.mQueue = queue;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// add this to the queue. would be picked up by the next
// available thread.
// This way task would be never rejected.
mQueue.add(r);
}
}
*/
}

View File

@ -0,0 +1,56 @@
package com.sun.jbi.jmsbc.util;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
*
* @author David BRASSELY (brasseld at gmail.com)
* @author OpenESB Community
*/
public class ScalingQueue<T> extends LinkedBlockingQueue<T> {
/**
* The executor this Queue belongs to
*/
private ThreadPoolExecutor executor;
/**
* Creates a TaskQueue with a capacity of {@link Integer#MAX_VALUE}.
*/
public ScalingQueue() {
super();
}
/**
* Creates a TaskQueue with the given (fixed) capacity.
*
* @param capacity the capacity of this queue.
*/
public ScalingQueue(int capacity) {
super(capacity);
}
/**
* Sets the executor this queue belongs to.
*/
public void setThreadPoolExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}
/**
* Inserts the specified element at the tail of this queue if there is at
* least one available thread to run the current task. If all pool threads
* are actively busy, it rejects the offer.
*
* @param o the element to add.
* @return true if it was possible to add the element to this queue, else
* false
* @see ThreadPoolExecutor#execute(Runnable)
*/
@Override
public boolean offer(T o) {
int allWorkingThreads = executor.getActiveCount() + super.size();
return allWorkingThreads < executor.getPoolSize() && super.offer(o);
}
}

View File

@ -0,0 +1,39 @@
package com.sun.jbi.jmsbc.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author David BRASSELY (brasseld at gmail.com)
* @author OpenESB Community
*/
public class ScalingThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger activeCount = new AtomicInteger();
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ScalingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public int getActiveCount() {
return activeCount.get();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
activeCount.incrementAndGet();
}
@Override protected void afterExecute(Runnable r, Throwable t) {
activeCount.decrementAndGet();
}
}