diff options
Diffstat (limited to 'src/mongo/util/concurrency/thread_pool.h')
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 181 |
1 files changed, 33 insertions, 148 deletions
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index a6e56f8c9bf..29acd9e09c0 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -29,47 +29,52 @@ #pragma once -#include <deque> #include <functional> +#include <memory> #include <string> -#include <vector> -#include "mongo/platform/mutex.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/thread_pool_interface.h" -#include "mongo/util/hierarchical_acquisition.h" +#include "mongo/util/duration.h" #include "mongo/util/time_support.h" namespace mongo { -class Status; - /** * A configurable thread pool, for general use. * * See the Options struct for information about how to configure an instance. */ class ThreadPool final : public ThreadPoolInterface { - ThreadPool(const ThreadPool&) = delete; - ThreadPool& operator=(const ThreadPool&) = delete; - public: - struct Limits; + /** + * Contains a subset of the fields from Options related to limiting the number of concurrent + * threads in the pool. Used in places where we want a way to specify limits to the size of a + * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate + * behaviors. Each field of Limits maps directly to the same-named field in Options. + */ + struct Limits { + size_t minThreads = 1; + size_t maxThreads = 8; + Milliseconds maxIdleThreadAge = Seconds{30}; + }; /** * Structure used to configure an instance of ThreadPool. */ struct Options { - - Options() = default; - explicit Options(const Limits& limits); - // Set maxThreads to this if you don't want to limit the number of threads in the pool. // Note: the value used here is high enough that it will never be reached, but low enough // that it won't cause overflows if mixed with signed ints or math. static constexpr size_t kUnlimited = 1'000'000'000; + Options() = default; + + explicit Options(const Limits& limits) + : minThreads(limits.minThreads), + maxThreads(limits.maxThreads), + maxIdleThreadAge(limits.maxIdleThreadAge) {} + // Name of the thread pool. If this string is empty, the pool will be assigned a // name unique to the current process. std::string poolName; @@ -95,29 +100,15 @@ public: // a thread. Milliseconds maxIdleThreadAge = Seconds{30}; - // This function is run before each worker thread begins consuming tasks. - using OnCreateThreadFn = std::function<void(const std::string& threadName)>; - OnCreateThreadFn onCreateThread = [](const std::string&) {}; + /** If callable, called before each worker thread begins consuming tasks. */ + std::function<void(const std::string&)> onCreateThread; /** - * This function is called after joining each retired thread. + * If callable, called after joining each retired thread. * Since there could be multiple calls to this function in a single critical section, * avoid complex logic in the callback. */ - using OnJoinRetiredThreadFn = std::function<void(const stdx::thread&)>; - OnJoinRetiredThreadFn onJoinRetiredThread = [](const stdx::thread&) {}; - }; - - /** - * Contains a subset of the fields from Options related to limiting the number of concurrent - * threads in the pool. Used in places where we want a way to specify limits to the size of a - * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate - * behaviors. Each field of Limits maps directly to the same-named field in Options. - */ - struct Limits { - size_t minThreads = 1; - size_t maxThreads = 8; - Milliseconds maxIdleThreadAge = Seconds{30}; + std::function<void(const stdx::thread&)> onJoinRetiredThread; }; /** @@ -145,12 +136,18 @@ public: */ explicit ThreadPool(Options options); + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ~ThreadPool() override; + // from OutOfLineExecutor (base of ThreadPoolInterface) + void schedule(Task task) override; + + // from ThreadPoolInterface void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; /** * Blocks the caller until there are no pending tasks on this pool. @@ -170,120 +167,8 @@ public: Stats getStats() const; private: - using TaskList = std::deque<Task>; - using ThreadList = std::vector<stdx::thread>; - using RetiredThreadList = std::list<stdx::thread>; - - /** - * Representation of the stage of life of a thread pool. - * - * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work - * may only be scheduled in the preStart and running states. Threads may only be started in the - * running state. In shutdownComplete, there are no remaining threads or pending tasks to - * execute. - * - * Diagram of legal transitions: - * - * preStart -> running -> joinRequired -> joining -> shutdownComplete - * \ ^ - * \_____________/ - */ - enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete }; - - /** - * This is the thread body for worker threads. It is a static member function, - * because late in its execution it is possible for the pool to have been destroyed. - * As such, it is advisable to pass the pool pointer as an explicit argument, rather - * than as the implicit "this" argument. - */ - static void _workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept; - - /** - * Starts a worker thread, unless _options.maxThreads threads are already running or - * _state is not running. - */ - void _startWorkerThread_inlock(); - - /** - * This is the run loop of a worker thread, invoked by _workerThreadBody. - */ - void _consumeTasks(); - - /** - * Implementation of shutdown once _mutex is locked. - */ - void _shutdown_inlock(); - - /** - * Implementation of join once _mutex is owned by "lk". - */ - void _join_inlock(stdx::unique_lock<Latch>* lk); - - /** - * Runs the remaining tasks on a new thread as part of the join process, blocking until - * complete. Caller must not hold the mutex! - */ - void _drainPendingTasks(); - - /** - * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at - * least one entry. - */ - void _doOneTask(stdx::unique_lock<Latch>* lk) noexcept; - - /** - * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state - * change. Has no effect if _state == newState. - */ - void _setState_inlock(LifecycleState newState); - - /** - * Waits for all remaining retired threads to join. - * If a thread's _workerThreadBody() were ever to attempt to reacquire - * ThreadPool::_mutex after that thread had been added to _retiredThreads, - * it could cause a deadlock. - */ - void _joinRetired_inlock(); - - // These are the options with which the pool was configured at construction time. - const Options _options; - - // Mutex guarding all non-const member variables. - mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex"); - - // This variable represents the lifecycle state of the pool. - // - // Work may only be scheduled in states preStart and running, and only executes in states - // running and shuttingDown. - LifecycleState _state = preStart; - - // Condition signaled to indicate that there is work in the _pendingTasks queue, or - // that the system is shutting down. - stdx::condition_variable _workAvailable; - - // Condition signaled to indicate that there is no work in the _pendingTasks queue. - stdx::condition_variable _poolIsIdle; - - // Condition variable signaled whenever _state changes. - stdx::condition_variable _stateChange; - - // Queue of yet-to-be-executed tasks. - TaskList _pendingTasks; - - // List of threads serving as the worker pool. - ThreadList _threads; - - // List of threads that are retired and pending join - RetiredThreadList _retiredThreads; - - // Count of idle threads. - size_t _numIdleThreads = 0; - - // Id counter for assigning thread names - size_t _nextThreadId = 0; - - // The last time that _pendingTasks.size() grew to be at least _threads.size(). - Date_t _lastFullUtilizationDate; + class Impl; + std::unique_ptr<Impl> _impl; }; } // namespace mongo |