summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
blob: 3e99b244c445c7d2e6c252fd5149bfe762d8465a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;


/**
 * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
 * the references taken, instantiating the service on the first reference, and shutting it down when the last
 * reference is released.
 *
 * <p/>It is important to ensure that an executor service is correctly shut down as failing to do so prevents the JVM
 * from terminating due to the existence of non-daemon threads.
 *
 * <p/><table id="crc><caption>CRC Card</caption>
 * <tr><th> Responsibilities <th> Collaborations
 * <tr><td> Provide a shared executor service. <td> {@link Executors}
 * <tr><td> Shutdown the executor service when not needed. <td> {@link ExecutorService}
 * <tr><td> Track references to the executor service.
 * <tr><td> Provide configuration of the executor service.
 * </table>
 *
 * @todo Might be more elegant to make this actually implement ExecutorService, providing better hiding of the
 *       implementation details. Also this class introduces a pattern (albeit specific to this usage) that could be
 *       generalized to reference count anything. That is, on first instance call a create method, on release of last
 *       instance call a destroy method. This could definitely be abstracted out as a re-usable piece of code; a
 *       reference counting factory. It could then be re-used to do reference counting in other places (such as
 *       messages). Countable objects have a simple create/destroy life cycle, capturable by an interface that the
 *       ref counting factory can call to manage the lifecycle.
 *
 * @todo {@link #_poolSize} should be static?
 *
 * @todo The {@link #getPool()} method breaks the encapsulation of the reference counter. Generally when getPool is used
 *       further checks are applied to ensure that the executor service has not been shutdown. This passes responsibility
 *       for managing the lifecycle of the reference counted object onto the caller rather than neatly encapsulating it
 *       here. Could think about adding more state to the lifecycle, to mark ref counted objects as invalid, and have an
 *       isValid method, or could make calling code deal with RejectedExecutionException raised by shutdown executors.
 */
public class ReferenceCountingExecutorService
{


    /** Defines the smallest thread pool that will be allocated, irrespective of the number of processors. */
    private static final int MINIMUM_POOL_SIZE = 4;

    /** Holds the number of processors on the machine. */
    private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();

    /** Defines the thread pool size to use, which is the larger of the number of CPUs or the minimum size. */
    private static final int DEFAULT_POOL_SIZE = Math.max(NUM_CPUS, MINIMUM_POOL_SIZE);

    /**
     * Holds the singleton instance of this reference counter. This is only created once, statically, so the
     * {@link #getInstance()} method does not need to be synchronized.
     */
    private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService();

    /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */
    private final Object _lock = new Object();

    /** The shared executor service that is reference counted. */
    private ExecutorService _pool;

    /** Holds the number of references given out to the executor service. */
    private int _refCount = 0;

    /** Holds the number of executor threads to create. */
    private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);

    /** Thread Factory used to create thread of the pool.   Uses the default implementation provided by
     *  {@link java.util.concurrent.Executors#defaultThreadFactory()} unless reset by the caller.
     */
    private ThreadFactory _threadFactory = Executors.defaultThreadFactory();

    /**
     * Retrieves the singleton instance of this reference counter.
     *
     * @return The singleton instance of this reference counter.
     */
    public static ReferenceCountingExecutorService getInstance()
    {
        return _instance;
    }

    /**
     * Private constructor to ensure that only a singleton instance can be created.
     */
    private ReferenceCountingExecutorService()
    { }

    /**
     * Provides a reference to a shared executor service, incrementing the reference count.
     *
     * @return An executor service.
     */
    public ExecutorService acquireExecutorService()
    {
        synchronized (_lock)
        {
            if (_refCount++ == 0)
            {
                _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>(),
                        _threadFactory);
            }

            return _pool;
        }
    }

    /**
     * Releases a reference to a shared executor service, decrementing the reference count. If the reference count falls
     * to zero, the executor service is shut down.
     */
    public void releaseExecutorService()
    {
        synchronized (_lock)
        {
            if (--_refCount == 0)
            {
                _pool.shutdownNow();
            }
        }
    }

    /**
     * Provides access to the executor service, without touching the reference count.
     *
     * @return The shared executor service, or <tt>null</tt> if none has been instantiated yet.
     */
    public ExecutorService getPool()
    {
        return _pool;
    }

    /**
     * Return the ReferenceCount to this ExecutorService
     * @return reference count
     */
    public int getReferenceCount()
    {
        return _refCount;
    }

    /**
     *
     * Return the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
     *
     * @return thread factory
     */
    public ThreadFactory getThreadFactory()
    {
        return _threadFactory;
    }

    /**
     * Sets the thread factory used by the {@link ThreadPoolExecutor} to create new threads.
     * <p>
     * If the pool has been already created, the change will have no effect until
     * {@link #getReferenceCount()} reaches zero and the pool recreated.  For this reason,
     * callers must invoke this method <i>before</i> calling {@link #acquireExecutorService()}.
     *
     * @param threadFactory thread factory
     */
    public void setThreadFactory(final ThreadFactory threadFactory)
    {
        if (threadFactory == null)
        {
            throw new NullPointerException("threadFactory cannot be null");
        }
        _threadFactory = threadFactory;
    }

}