diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2015-06-12 19:09:04 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2015-06-23 15:20:04 -0400 |
commit | 891e0b850200725dc73e1ac1cd803436cd6099bc (patch) | |
tree | 9661c3079d5c6cecb8555d3e0abc4224ce867df0 /src/mongo/util/concurrency/thread_pool.h | |
parent | b6057b08dc313ef372db05f5945dd23b9b1ed7cf (diff) | |
download | mongo-891e0b850200725dc73e1ac1cd803436cd6099bc.tar.gz |
SERVER-19000 Implement a ThreadPool that dynamically adjusts the number of threads based on demand.
This pool is derived from the one built into executor::NetworkInterfaceImpl.
Diffstat (limited to 'src/mongo/util/concurrency/thread_pool.h')
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h new file mode 100644 index 00000000000..c186e87d731 --- /dev/null +++ b/src/mongo/util/concurrency/thread_pool.h @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class Status; + +/** + * A configurable thread pool, for general use. + * + * See the Options struct for information about how to configure an instance. + */ +class ThreadPool { + MONGO_DISALLOW_COPYING(ThreadPool); + +public: + using Task = stdx::function<void()>; + + /** + * Structure used to configure an instance of ThreadPool. + */ + struct Options { + // Name of the thread pool. If this string is empty, the pool will be assigned a + // name unique to the current process. + std::string poolName; + + // Prefix used to name threads for logging purposes. + // + // An integer will be appended to this string to create the thread name for each thread in + // the pool. Warning, if you create two pools and give them the same threadNamePrefix, you + // could have multiple threads that report the same name. If you leave this empty, the + // prefix will be the pool name followed by a hyphen. + std::string threadNamePrefix; + + // Minimum number of threads that must be in the pool. + // + // At least this many threads will be created at startup, and the pool will not reduce the + // total number of threads below this threshold before shutdown. + size_t minThreads = 1; + + // The pool will never grow to contain more than this many threads. + size_t maxThreads = 8; + + // If the pool has had at least one idle thread for this much time, it may consider reaping + // a thread. + Milliseconds maxIdleThreadAge = Seconds{30}; + }; + + /** + * Structure used to return information about the thread pool via getStats(). + */ + struct Stats { + // The options for the instance of the pool returning these stats. + Options options; + + // The number of threads currently in the pool, idle or active. + size_t numThreads; + + // The number of idle threads currently in the pool. + size_t numIdleThreads; + + // The number of tasks waiting to be executed by the pool. + size_t numPendingTasks; + + // The last time that no threads in the pool were idle. + Date_t lastFullUtilizationDate; + }; + + /** + * Constructs a thread pool, configured with the given "options". + */ + explicit ThreadPool(Options options); + + /** + * Destroys a thread pool. + * + * The destructor may block if join() has not previously been called and returned. + * It is fatal to destroy the pool while another thread is blocked on join(). + */ + ~ThreadPool(); + + /** + * Starts the thread pool. May be called at most once. + */ + void startup(); + + /** + * Signals the thread pool to shut down. Returns promptly. + * + * After this call, the thread will return an error for subsequent calls to schedule(). + * + * May be called by a task executing in the thread pool. Call join() after calling shutdown() + * to block until all tasks scheduled on the pool complete. + */ + void shutdown(); + + /** + * Blocks until the thread pool has fully shut down. Call at most once, and never from a task + * inside the pool. + */ + void join(); + + /** + * Schedules "task" to run in the thread pool. + * + * Returns OK on success, ShutdownInProgress if shutdown() has already executed. + * + * It is safe to call this before startup(), but the scheduled task will not execute + * until after startup() is called. + */ + Status schedule(Task task); + + /** + * Blocks the caller until there are no pending tasks on this pool. + * + * It is legal to call this whether or not shutdown has been called, but if it is called + * *before* shutdown() is called, there is no guarantee that there will still be no pending + * tasks when the function returns. + * + * May be called multiple times, by multiple threads. May not be called by a task in the thread + * pool. + */ + void waitForIdle(); + + /** + * Returns statistics about the thread pool's utilization. + */ + Stats getStats(); + +private: + using TaskList = stdx::list<Task>; + using ThreadList = std::vector<stdx::thread>; + + /** + * Representation of the stage of life of a thread pool. + * + * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work + * may only be scheduled in the preStart and running states. Threads may only be started in the + * running state. In shutdownComplete, there are no remaining threads or pending tasks to + * execute. + * + * Diagram of legal transitions: + * + * preStart -> running -> joinRequired -> joining -> shutdownComplete + * \ ^ + * \_____________/ + */ + enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete }; + + /** + * This is the thread body for worker threads. It is a static member function, + * because late in its execution it is possible for the pool to have been destroyed. + * As such, it is advisable to pass the pool pointer as an explicit argument, rather + * than as the implicit "this" argument. + */ + static void _workerThreadBody(ThreadPool* pool, const std::string& threadName); + + /** + * Starts a worker thread, unless _options.maxThreads threads are already running or + * _state is not running. + */ + void _startWorkerThread_inlock(); + + /** + * This is the run loop of a worker thread, invoked by _workerThreadBody. + */ + void _consumeTasks(); + + /** + * Implementation of shutdown once _mutex is locked. + */ + void _shutdown_inlock(); + + /** + * Implementation of join once _mutex is owned by "lk". + */ + void _join_inlock(stdx::unique_lock<stdx::mutex>* lk); + + /** + * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at + * least one entry. + */ + void _doOneTask(stdx::unique_lock<stdx::mutex>* lk); + + /** + * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state + * change. Has no effect if _state == newState. + */ + void _setState_inlock(LifecycleState newState); + + // These are the options with which the pool was configured at construction time. + const Options _options; + + // Mutex guarding all non-const member variables. + stdx::mutex _mutex; + + // This variable represents the lifecycle state of the pool. + // + // Work may only be scheduled in states preStart and running, and only executes in states + // running and shuttingDown. + LifecycleState _state = preStart; + + // Condition signaled to indicate that there is work in the _pendingTasks queue, or + // that the system is shutting down. + stdx::condition_variable _workAvailable; + + // Condition signaled to indicate that there is no work in the _pendingTasks queue. + stdx::condition_variable _poolIsIdle; + + // Condition variable signaled whenever _state changes. + stdx::condition_variable _stateChange; + + // Queue of yet-to-be-executed tasks. + TaskList _pendingTasks; + + // List of threads serving as the worker pool. + ThreadList _threads; + + // Count of idle threads. + size_t _numIdleThreads = 0; + + // Id counter for assigning thread names + size_t _nextThreadId = 0; + + // The last time that _pendingTasks.size() grew to be at least _threads.size(). + Date_t _lastFullUtilizationDate; +}; + +} // namespace mongo |