diff options
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java')
-rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java new file mode 100644 index 0000000000..8152a1f5e9 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.pool; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; + + +/** + * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts + * the references taken, instantiating the service on the first reference, and shutting it down when the last + * reference is released. + * + * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM + * from terminating due to the existence of non-daemon threads. + * + * <p/><table id="crc><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide a shared executor service. <td> {@link Executors} + * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService} + * <tr><td> Track references to the executor service. + * <tr><td> Provide configuration of the executor service. + * </table> + * + * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the + * implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be + * generalized to reference count anything. That is, on first instance call a create method, on release of last + * instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a + * reference counting factory. It could then be re-used to do reference counting in other places (such as + * messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the + * ref counting factory can call to manage the lifecycle. + * + * @todo {@link #_poolSize} should be static? + * + * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used + * further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility + * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it + * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an + * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors. + */ +public class ReferenceCountingExecutorService +{ + + + /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */ + private static final int MINIMUM_POOL_SIZE = 4; + + /** Holds the number of processors on the machine. */ + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + + /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */ + private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE); + + /** + * Holds the singleton instance of this reference counter. This is only created once, statically, so the + * {@link #getInstance()} method does not need to be synchronized. + */ + private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService(); + + /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */ + private final Object _lock = new Object(); + + /** The shared executor service that is reference counted. */ + private ExecutorService _pool; + + /** Holds the number of references given out to the executor service. */ + private int _refCount = 0; + + /** Holds the number of executor threads to create. */ + private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + + /** Thread Factory used to create thread of the pool. Uses the default implementation provided by + * {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller. + */ + private ThreadFactory _threadFactory = Executors.defaultThreadFactory(); + + private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + + /** + * Retrieves the singleton instance of this reference counter. + * + * @return The singleton instance of this reference counter. + */ + public static ReferenceCountingExecutorService getInstance() + { + return _instance; + } + + /** + * Private constructor to ensure that only a singleton instance can be created. + */ + private ReferenceCountingExecutorService() + { } + + /** + * Provides a reference to a shared executor service, incrementing the reference count. + * + * @return An executor service. + */ + public ExecutorService acquireExecutorService() + { + synchronized (_lock) + { + if (_refCount++ == 0) + { + // Use a job queue that biases to writes + if(_useBiasedPool) + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue(), + _threadFactory); + + } + else + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + _threadFactory); + } + + } + + + return _pool; + } + } + + /** + * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls + * to zero, the executor service is shut down. + */ + public void releaseExecutorService() + { + synchronized (_lock) + { + if (--_refCount == 0) + { + _pool.shutdownNow(); + } + } + } + + /** + * Provides access to the executor service, without touching the reference count. + * + * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet. + */ + public ExecutorService getPool() + { + return _pool; + } + + /** + * Return the ReferenceCount to this ExecutorService + * @return reference count + */ + public int getReferenceCount() + { + return _refCount; + } + + /** + * + * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * + * @return thread factory + */ + public ThreadFactory getThreadFactory() + { + return _threadFactory; + } + + /** + * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads. + * <p> + * If the pool has been already created, the change will have no effect until + * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason, + * callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}. + * + * @param threadFactory thread factory + */ + public void setThreadFactory(final ThreadFactory threadFactory) + { + if (threadFactory == null) + { + throw new NullPointerException("threadFactory cannot be null"); + } + _threadFactory = threadFactory; + } + +} |