diff options
Diffstat (limited to 'java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java')
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 84c9e1f465..ce9c6ae4cb 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -22,6 +22,9 @@ package org.apache.qpid.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts @@ -84,6 +87,8 @@ public class ReferenceCountingExecutorService /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + /** * Retrieves the singleton instance of this reference counter. * @@ -105,15 +110,28 @@ public class ReferenceCountingExecutorService * * @return An executor service. */ - ExecutorService acquireExecutorService() + public ExecutorService acquireExecutorService() { synchronized (_lock) { if (_refCount++ == 0) { - _pool = Executors.newFixedThreadPool(_poolSize); +// _pool = Executors.newFixedThreadPool(_poolSize); + + // Use a job queue that biases to writes + if(_useBiasedPool) + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); + } + else + { + _pool = Executors.newFixedThreadPool(_poolSize); + } } + return _pool; } } @@ -122,7 +140,7 @@ public class ReferenceCountingExecutorService * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls * to zero, the executor service is shut down. */ - void releaseExecutorService() + public void releaseExecutorService() { synchronized (_lock) { |