summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java215
1 files changed, 215 insertions, 0 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
new file mode 100644
index 0000000000..8152a1f5e9
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.pool;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
+ * the references taken, instantiating the service on the first reference, and shutting it down when the last
+ * reference is released.
+ *
+ * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
+ * from terminating due to the existence of non-daemon threads.
+ *
+ * <p/><table id="crc><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a shared executor service. <td> {@link Executors}
+ * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
+ * <tr><td> Track references to the executor service.
+ * <tr><td> Provide configuration of the executor service.
+ * </table>
+ *
+ * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
+ * implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
+ * generalized to reference count anything. That is, on first instance call a create method, on release of last
+ * instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
+ * reference counting factory. It could then be re-used to do reference counting in other places (such as
+ * messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
+ * ref counting factory can call to manage the lifecycle.
+ *
+ * @todo {@link #_poolSize} should be static?
+ *
+ * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
+ * further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility
+ * for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
+ * here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
+ * isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
+ */
+public class ReferenceCountingExecutorService
+{
+
+
+ /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
+ private static final int MINIMUM_POOL_SIZE = 4;
+
+ /** Holds the number of processors on the machine. */
+ private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+
+ /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
+ private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);
+
+ /**
+ * Holds the singleton instance of this reference counter. This is only created once, statically, so the
+ * {@link #getInstance()} method does not need to be synchronized.
+ */
+ private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();
+
+ /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
+ private final Object _lock = new Object();
+
+ /** The shared executor service that is reference counted. */
+ private ExecutorService _pool;
+
+ /** Holds the number of references given out to the executor service. */
+ private int _refCount = 0;
+
+ /** Holds the number of executor threads to create. */
+ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+
+ /** Thread Factory used to create thread of the pool. Uses the default implementation provided by
+ * {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller.
+ */
+ private ThreadFactory _threadFactory = Executors.defaultThreadFactory();
+
+ private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
+ /**
+ * Retrieves the singleton instance of this reference counter.
+ *
+ * @return The singleton instance of this reference counter.
+ */
+ public static ReferenceCountingExecutorService getInstance()
+ {
+ return _instance;
+ }
+
+ /**
+ * Private constructor to ensure that only a singleton instance can be created.
+ */
+ private ReferenceCountingExecutorService()
+ { }
+
+ /**
+ * Provides a reference to a shared executor service, incrementing the reference count.
+ *
+ * @return An executor service.
+ */
+ public ExecutorService acquireExecutorService()
+ {
+ synchronized (_lock)
+ {
+ if (_refCount++ == 0)
+ {
+ // Use a job queue that biases to writes
+ if(_useBiasedPool)
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue(),
+ _threadFactory);
+
+ }
+ else
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ _threadFactory);
+ }
+
+ }
+
+
+ return _pool;
+ }
+ }
+
+ /**
+ * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls
+ * to zero, the executor service is shut down.
+ */
+ public void releaseExecutorService()
+ {
+ synchronized (_lock)
+ {
+ if (--_refCount == 0)
+ {
+ _pool.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Provides access to the executor service, without touching the reference count.
+ *
+ * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
+ */
+ public ExecutorService getPool()
+ {
+ return _pool;
+ }
+
+ /**
+ * Return the ReferenceCount to this ExecutorService
+ * @return reference count
+ */
+ public int getReferenceCount()
+ {
+ return _refCount;
+ }
+
+ /**
+ *
+ * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
+ *
+ * @return thread factory
+ */
+ public ThreadFactory getThreadFactory()
+ {
+ return _threadFactory;
+ }
+
+ /**
+ * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
+ * <p>
+ * If the pool has been already created, the change will have no effect until
+ * {@link #getReferenceCount()} reaches zero and the pool recreated. For this reason,
+ * callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}.
+ *
+ * @param threadFactory thread factory
+ */
+ public void setThreadFactory(final ThreadFactory threadFactory)
+ {
+ if (threadFactory == null)
+ {
+ throw new NullPointerException("threadFactory cannot be null");
+ }
+ _threadFactory = threadFactory;
+ }
+
+}