From 0ce948778cc5e0d7d8a9c76abd37e8ac875d9ef4 Mon Sep 17 00:00:00 2001 From: Billy Donahue Date: Tue, 1 Sep 2020 07:18:04 +0000 Subject: SERVER-50228 ThreadPool predicate condvar wait - Switch std::vector to std::list to enable node splicing. - fmt::format - ThreadPool::Impl - check for uncallable onCreate in ServiceExecutorFixed - _workerThreadBody can be member of ThreadPool. Threads don't detach anymore --- src/mongo/transport/service_executor_fixed.cpp | 4 +- src/mongo/util/concurrency/thread_pool.cpp | 429 ++++++++++++++++--------- src/mongo/util/concurrency/thread_pool.h | 181 ++--------- 3 files changed, 320 insertions(+), 294 deletions(-) diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index bdf75660dce..f48a9d7a170 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -64,7 +64,9 @@ ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) _options.onCreateThread = [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable { _executorContext = std::make_unique(this->weak_from_this()); - onCreate(name); + if (onCreate) { + onCreate(name); + } }; _threadPool = std::make_unique(_options); } diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index 680d397946f..0e8eda183b4 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -33,23 +33,37 @@ #include "mongo/util/concurrency/thread_pool.h" +#include +#include +#include +#include +#include + #include "mongo/base/status.h" #include "mongo/logv2/log.h" #include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/concurrency/thread_name.h" -#include "mongo/util/str.h" - -#include +#include "mongo/util/hierarchical_acquisition.h" namespace mongo { namespace { +using namespace fmt::literals; + // Counter used to assign unique names to otherwise-unnamed thread pools. AtomicWord nextUnnamedThreadPoolId{1}; +std::string threadIdToString(stdx::thread::id id) { + std::ostringstream oss; + oss << id; + return oss.str(); +} + /** * Sets defaults and checks bounds limits on "options", and returns it. * @@ -57,10 +71,10 @@ AtomicWord nextUnnamedThreadPoolId{1}; */ ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) { if (options.poolName.empty()) { - options.poolName = str::stream() << "ThreadPool" << nextUnnamedThreadPoolId.fetchAndAdd(1); + options.poolName = "ThreadPool{}"_format(nextUnnamedThreadPoolId.fetchAndAdd(1)); } if (options.threadNamePrefix.empty()) { - options.threadNamePrefix = str::stream() << options.poolName << '-'; + options.threadNamePrefix = "{}-"_format(options.poolName); } if (options.maxThreads < 1) { LOGV2_FATAL(28702, @@ -85,28 +99,144 @@ ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) { } // namespace -ThreadPool::Options::Options(const ThreadPool::Limits& limits) - : minThreads(limits.minThreads), - maxThreads(limits.maxThreads), - maxIdleThreadAge(limits.maxIdleThreadAge) {} -ThreadPool::ThreadPool(Options options) : _options(cleanUpOptions(std::move(options))) {} +// Public functions forwarded from ThreadPool. +class ThreadPool::Impl { +public: + explicit Impl(Options options); + ~Impl(); + void startup(); + void shutdown(); + void join(); + void schedule(Task task); + void waitForIdle(); + Stats getStats() const; + +private: + /** + * Representation of the stage of life of a thread pool. + * + * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work + * may only be scheduled in the preStart and running states. Threads may only be started in the + * running state. In shutdownComplete, there are no remaining threads or pending tasks to + * execute. + * + * Diagram of legal transitions: + * + * preStart -> running -> joinRequired -> joining -> shutdownComplete + * \ ^ + * \_____________/ + */ + enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete }; + + /** The thread body for worker threads. */ + void _workerThreadBody(const std::string& threadName) noexcept; + + /** + * Starts a worker thread, unless _options.maxThreads threads are already running or + * _state is not running. + */ + void _startWorkerThread_inlock(); + + /** + * This is the run loop of a worker thread, invoked by _workerThreadBody. + */ + void _consumeTasks(); + + /** + * Implementation of shutdown once _mutex is locked. + */ + void _shutdown_inlock(); + + /** + * Implementation of join once _mutex is owned by "lk". + */ + void _join_inlock(stdx::unique_lock* lk); + + /** + * Runs the remaining tasks on a new thread as part of the join process, blocking until + * complete. Caller must not hold the mutex! + */ + void _drainPendingTasks(); + + /** + * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at + * least one entry. + */ + void _doOneTask(stdx::unique_lock* lk) noexcept; + + /** + * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state + * change. Has no effect if _state == newState. + */ + void _setState_inlock(LifecycleState newState); + + /** + * Waits for all remaining retired threads to join. + * If a thread's _workerThreadBody() were ever to attempt to reacquire + * ThreadPool::_mutex after that thread had been added to _retiredThreads, + * it could cause a deadlock. + */ + void _joinRetired_inlock(); + + // These are the options with which the pool was configured at construction time. + const Options _options; + + // Mutex guarding all non-const member variables. + mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex"); + + // This variable represents the lifecycle state of the pool. + // + // Work may only be scheduled in states preStart and running, and only executes in states + // running and shuttingDown. + LifecycleState _state = preStart; + + // Condition signaled to indicate that there is work in the _pendingTasks queue, or + // that the system is shutting down. + stdx::condition_variable _workAvailable; + + // Condition signaled to indicate that there is no work in the _pendingTasks queue. + stdx::condition_variable _poolIsIdle; + + // Condition variable signaled whenever _state changes. + stdx::condition_variable _stateChange; + + // Queue of yet-to-be-executed tasks. + std::deque _pendingTasks; + + // List of threads serving as the worker pool. + std::list _threads; + + // List of threads that are retired and pending join + std::list _retiredThreads; + + // Count of idle threads. + size_t _numIdleThreads = 0; + + // Id counter for assigning thread names + size_t _nextThreadId = 0; + + // The last time that _pendingTasks.size() grew to be at least _threads.size(). + Date_t _lastFullUtilizationDate; +}; + +ThreadPool::Impl::Impl(Options options) : _options(cleanUpOptions(std::move(options))) {} -ThreadPool::~ThreadPool() { +ThreadPool::Impl::~Impl() { stdx::unique_lock lk(_mutex); _shutdown_inlock(); - if (shutdownComplete != _state) { + if (_state != shutdownComplete) { _join_inlock(&lk); } - if (shutdownComplete != _state) { + if (_state != shutdownComplete) { LOGV2_FATAL(28704, "Failed to shutdown pool during destruction"); } invariant(_threads.empty()); invariant(_pendingTasks.empty()); } -void ThreadPool::startup() { +void ThreadPool::Impl::startup() { stdx::lock_guard lk(_mutex); if (_state != preStart) { LOGV2_FATAL(28698, @@ -116,19 +246,18 @@ void ThreadPool::startup() { } _setState_inlock(running); invariant(_threads.empty()); - const size_t numToStart = - std::min(_options.maxThreads, std::max(_options.minThreads, _pendingTasks.size())); + size_t numToStart = std::clamp(_pendingTasks.size(), _options.minThreads, _options.maxThreads); for (size_t i = 0; i < numToStart; ++i) { _startWorkerThread_inlock(); } } -void ThreadPool::shutdown() { +void ThreadPool::Impl::shutdown() { stdx::lock_guard lk(_mutex); _shutdown_inlock(); } -void ThreadPool::_shutdown_inlock() { +void ThreadPool::Impl::_shutdown_inlock() { switch (_state) { case preStart: case running: @@ -143,38 +272,30 @@ void ThreadPool::_shutdown_inlock() { MONGO_UNREACHABLE; } -void ThreadPool::join() { +void ThreadPool::Impl::join() { stdx::unique_lock lk(_mutex); _join_inlock(&lk); } -void ThreadPool::_joinRetired_inlock() { +void ThreadPool::Impl::_joinRetired_inlock() { while (!_retiredThreads.empty()) { auto& t = _retiredThreads.front(); t.join(); - _options.onJoinRetiredThread(t); + if (_options.onJoinRetiredThread) + _options.onJoinRetiredThread(t); _retiredThreads.pop_front(); } } -void ThreadPool::_join_inlock(stdx::unique_lock* lk) { - _stateChange.wait(*lk, [this] { - switch (_state) { - case preStart: - return false; - case running: - return false; - case joinRequired: - return true; - case joining: - case shutdownComplete: - LOGV2_FATAL(28700, - "Attempted to join pool {poolName} more than once", - "Attempted to join pool more than once", - "poolName"_attr = _options.poolName); - } - MONGO_UNREACHABLE; - }); +void ThreadPool::Impl::_join_inlock(stdx::unique_lock* lk) { + _stateChange.wait(*lk, [this] { return _state != preStart && _state != running; }); + if (_state != joinRequired) { + LOGV2_FATAL(28700, + "Attempted to join pool {poolName} more than once", + "Attempted to join pool more than once", + "poolName"_attr = _options.poolName); + } + _setState_inlock(joining); ++_numIdleThreads; if (!_pendingTasks.empty()) { @@ -184,8 +305,7 @@ void ThreadPool::_join_inlock(stdx::unique_lock* lk) { } --_numIdleThreads; _joinRetired_inlock(); - ThreadList threadsToJoin; - swap(threadsToJoin, _threads); + auto threadsToJoin = std::exchange(_threads, {}); lk->unlock(); for (auto& t : threadsToJoin) { t.join(); @@ -195,14 +315,14 @@ void ThreadPool::_join_inlock(stdx::unique_lock* lk) { _setState_inlock(shutdownComplete); } -void ThreadPool::_drainPendingTasks() { +void ThreadPool::Impl::_drainPendingTasks() { // Tasks cannot be run inline because they can create OperationContexts and the join() caller // may already have one associated with the thread. stdx::thread cleanThread = stdx::thread([&] { - const std::string threadName = str::stream() - << _options.threadNamePrefix << _nextThreadId++; + const std::string threadName = "{}{}"_format(_options.threadNamePrefix, _nextThreadId++); setThreadName(threadName); - _options.onCreateThread(threadName); + if (_options.onCreateThread) + _options.onCreateThread(threadName); stdx::unique_lock lock(_mutex); while (!_pendingTasks.empty()) { _doOneTask(&lock); @@ -211,16 +331,16 @@ void ThreadPool::_drainPendingTasks() { cleanThread.join(); } -void ThreadPool::schedule(Task task) { +void ThreadPool::Impl::schedule(Task task) { stdx::unique_lock lk(_mutex); switch (_state) { case joinRequired: case joining: case shutdownComplete: { - auto status = Status(ErrorCodes::ShutdownInProgress, - str::stream() << "Shutdown of thread pool " << _options.poolName - << " in progress"); + auto status = + Status(ErrorCodes::ShutdownInProgress, + "Shutdown of thread pool {} in progress"_format(_options.poolName)); lk.unlock(); task(status); @@ -246,15 +366,14 @@ void ThreadPool::schedule(Task task) { _workAvailable.notify_one(); } -void ThreadPool::waitForIdle() { +void ThreadPool::Impl::waitForIdle() { stdx::unique_lock lk(_mutex); - // If there are any pending tasks, or non-idle threads, the pool is not idle. - while (!_pendingTasks.empty() || _numIdleThreads < _threads.size()) { - _poolIsIdle.wait(lk); - } + // True when there are no `_pendingTasks` and all `_threads` are idle. + auto isIdle = [this] { return _pendingTasks.empty() && _numIdleThreads >= _threads.size(); }; + _poolIsIdle.wait(lk, isIdle); } -ThreadPool::Stats ThreadPool::getStats() const { +ThreadPool::Stats ThreadPool::Impl::getStats() const { stdx::lock_guard lk(_mutex); Stats result; result.options = _options; @@ -265,95 +384,91 @@ ThreadPool::Stats ThreadPool::getStats() const { return result; } -void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept { +void ThreadPool::Impl::_workerThreadBody(const std::string& threadName) noexcept { setThreadName(threadName); - pool->_options.onCreateThread(threadName); - const auto poolName = pool->_options.poolName; + if (_options.onCreateThread) + _options.onCreateThread(threadName); LOGV2_DEBUG(23104, 1, "Starting thread {threadName} in pool {poolName}", "Starting thread", "threadName"_attr = threadName, - "poolName"_attr = poolName); - pool->_consumeTasks(); - - // At this point, another thread may have destroyed "pool", if this thread chose to detach - // itself and remove itself from pool->_threads before releasing pool->_mutex. Do not access - // member variables of "pool" from here, on. - // - // This can happen if this thread decided to retire, got descheduled after removing itself - // from _threads and calling detach(), and then the pool was deleted. When this thread resumes, - // it is no longer safe to access "pool". + "poolName"_attr = _options.poolName); + _consumeTasks(); LOGV2_DEBUG(23105, 1, "Shutting down thread {threadName} in pool {poolName}", "Shutting down thread", "threadName"_attr = threadName, - "poolName"_attr = poolName); + "poolName"_attr = _options.poolName); } -void ThreadPool::_consumeTasks() { +void ThreadPool::Impl::_consumeTasks() { stdx::unique_lock lk(_mutex); while (_state == running) { - if (_pendingTasks.empty()) { - /** - * Help with garbage collecting retired threads to: - * * Reduce the memory overhead of _retiredThreads - * * Expedite the shutdown process - */ - _joinRetired_inlock(); - - if (_threads.size() > _options.minThreads) { - // Since there are more than minThreads threads, this thread may be eligible for - // retirement. If it isn't now, it may be later, so it must put a time limit on how - // long it waits on _workAvailable. - const auto now = Date_t::now(); - const auto nextThreadRetirementDate = - _lastFullUtilizationDate + _options.maxIdleThreadAge; - if (now >= nextThreadRetirementDate) { - _lastFullUtilizationDate = now; - LOGV2_DEBUG(23106, - 1, - "Reaping this thread; next thread reaped no earlier than " - "{nextThreadRetirementDate}", - "Reaping this thread", - "nextThreadRetirementDate"_attr = - _lastFullUtilizationDate + _options.maxIdleThreadAge); - break; - } - - LOGV2_DEBUG(23107, - 3, - "Not reaping this thread because the earliest retirement date is " + if (!_pendingTasks.empty()) { + _doOneTask(&lk); + continue; + } + + // Help with garbage collecting retired threads to reduce the + // memory overhead of _retiredThreads and expedite the shutdown + // process. + _joinRetired_inlock(); + + boost::optional waitDeadline; + + if (_threads.size() > _options.minThreads) { + // Since there are more than minThreads threads, this thread may be eligible for + // retirement. If it isn't now, it may be later, so it must put a time limit on how + // long it waits on _workAvailable. + const auto now = Date_t::now(); + const auto nextRetirement = _lastFullUtilizationDate + _options.maxIdleThreadAge; + if (now >= nextRetirement) { + _lastFullUtilizationDate = now; + LOGV2_DEBUG(23106, + 1, + "Reaping this thread; next thread reaped no earlier than " "{nextThreadRetirementDate}", - "Not reaping this thread", - "nextThreadRetirementDate"_attr = nextThreadRetirementDate); - MONGO_IDLE_THREAD_BLOCK; - _workAvailable.wait_until(lk, nextThreadRetirementDate.toSystemTimePoint()); - } else { - // Since the number of threads is not more than minThreads, this thread is not - // eligible for retirement. It is OK to sleep until _workAvailable is signaled, - // because any new threads that put the number of total threads above minThreads - // would be eligible for retirement once they had no work left to do. - LOGV2_DEBUG(23108, - 3, - "Waiting for work; the thread pool size is {numThreads}; the minimum " - "number of threads is {minThreads}", - "Waiting for work", - "numThreads"_attr = _threads.size(), - "minThreads"_attr = _options.minThreads); - MONGO_IDLE_THREAD_BLOCK; - _workAvailable.wait(lk); + "Reaping this thread", + "nextThreadRetirementDate"_attr = + _lastFullUtilizationDate + _options.maxIdleThreadAge); + break; } - continue; + + LOGV2_DEBUG(23107, + 3, + "Not reaping this thread because the earliest retirement date is " + "{nextThreadRetirementDate}", + "Not reaping this thread", + "nextThreadRetirementDate"_attr = nextRetirement); + waitDeadline = nextRetirement; + } else { + // Since the number of threads is not more than minThreads, this thread is not + // eligible for retirement. It is OK to sleep until _workAvailable is signaled, + // because any new threads that put the number of total threads above minThreads + // would be eligible for retirement once they had no work left to do. + LOGV2_DEBUG(23108, + 3, + "Waiting for work; the thread pool size is {numThreads}; the minimum " + "number of threads is {minThreads}", + "Waiting for work", + "numThreads"_attr = _threads.size(), + "minThreads"_attr = _options.minThreads); } - _doOneTask(&lk); + auto wake = [&] { return _state != running || !_pendingTasks.empty(); }; + MONGO_IDLE_THREAD_BLOCK; + if (waitDeadline) { + _workAvailable.wait_until(lk, waitDeadline->toSystemTimePoint(), wake); + } else { + _workAvailable.wait(lk, wake); + } } // We still hold the lock, but this thread is retiring. If the whole pool is shutting down, this // thread lends a hand in draining the work pool and returns so it can be joined. Otherwise, it - // falls through to the detach code, below. + // falls through to the thread retirement code, below. if (_state == joinRequired || _state == joining) { // Drain the leftover pending tasks. @@ -375,29 +490,22 @@ void ThreadPool::_consumeTasks() { "expectedState"_attr = static_cast(running)); } - // This thread is ending because it was idle for too long. Find self in _threads, remove self - // from _threads, and add self to the list of retired threads. - for (size_t i = 0; i < _threads.size(); ++i) { - auto& t = _threads[i]; - if (t.get_id() != stdx::this_thread::get_id()) { - continue; - } - std::swap(t, _threads.back()); - _retiredThreads.push_back(std::move(_threads.back())); - _threads.pop_back(); - return; + // This thread is ending because it was idle for too long. + // Move self from _threads to _retiredThreads. + auto selfId = stdx::this_thread::get_id(); + auto pos = std::find_if( + _threads.begin(), _threads.end(), [&](auto&& t) { return t.get_id() == selfId; }); + if (pos == _threads.end()) { + LOGV2_FATAL_NOTRACE(28703, + "Could not find thread with id {threadId} in pool {poolName}", + "Could not find thread", + "threadId"_attr = threadIdToString(selfId), + "poolName"_attr = _options.poolName); } - - std::ostringstream threadId; - threadId << stdx::this_thread::get_id(); - LOGV2_FATAL_NOTRACE(28703, - "Could not find thread with id {threadId} in pool {poolName}", - "Could not find thread", - "threadId"_attr = threadId.str(), - "poolName"_attr = _options.poolName); + _retiredThreads.splice(_retiredThreads.end(), _threads, pos); } -void ThreadPool::_doOneTask(stdx::unique_lock* lk) noexcept { +void ThreadPool::Impl::_doOneTask(stdx::unique_lock* lk) noexcept { invariant(!_pendingTasks.empty()); LOGV2_DEBUG(23109, 3, @@ -416,7 +524,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock* lk) noexcept { } } -void ThreadPool::_startWorkerThread_inlock() { +void ThreadPool::Impl::_startWorkerThread_inlock() { switch (_state) { case preStart: LOGV2_DEBUG( @@ -452,9 +560,9 @@ void ThreadPool::_startWorkerThread_inlock() { return; } invariant(_threads.size() < _options.maxThreads); - const std::string threadName = str::stream() << _options.threadNamePrefix << _nextThreadId++; + std::string threadName = "{}{}"_format(_options.threadNamePrefix, _nextThreadId++); try { - _threads.emplace_back([this, threadName] { _workerThreadBody(this, threadName); }); + _threads.emplace_back([this, threadName] { _workerThreadBody(threadName); }); ++_numIdleThreads; } catch (const std::exception& ex) { LOGV2_ERROR(23113, @@ -468,7 +576,7 @@ void ThreadPool::_startWorkerThread_inlock() { } } -void ThreadPool::_setState_inlock(const LifecycleState newState) { +void ThreadPool::Impl::_setState_inlock(const LifecycleState newState) { if (newState == _state) { return; } @@ -476,4 +584,35 @@ void ThreadPool::_setState_inlock(const LifecycleState newState) { _stateChange.notify_all(); } +// ======================================== +// ThreadPool public functions that simply forward to the `_impl`. + +ThreadPool::ThreadPool(Options options) : _impl{std::make_unique(std::move(options))} {} + +ThreadPool::~ThreadPool() = default; + +void ThreadPool::startup() { + _impl->startup(); +} + +void ThreadPool::shutdown() { + _impl->shutdown(); +} + +void ThreadPool::join() { + _impl->join(); +} + +void ThreadPool::schedule(Task task) { + _impl->schedule(std::move(task)); +} + +void ThreadPool::waitForIdle() { + _impl->waitForIdle(); +} + +ThreadPool::Stats ThreadPool::getStats() const { + return _impl->getStats(); +} + } // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index a6e56f8c9bf..29acd9e09c0 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -29,47 +29,52 @@ #pragma once -#include #include +#include #include -#include -#include "mongo/platform/mutex.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" #include "mongo/util/concurrency/thread_pool_interface.h" -#include "mongo/util/hierarchical_acquisition.h" +#include "mongo/util/duration.h" #include "mongo/util/time_support.h" namespace mongo { -class Status; - /** * A configurable thread pool, for general use. * * See the Options struct for information about how to configure an instance. */ class ThreadPool final : public ThreadPoolInterface { - ThreadPool(const ThreadPool&) = delete; - ThreadPool& operator=(const ThreadPool&) = delete; - public: - struct Limits; + /** + * Contains a subset of the fields from Options related to limiting the number of concurrent + * threads in the pool. Used in places where we want a way to specify limits to the size of a + * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate + * behaviors. Each field of Limits maps directly to the same-named field in Options. + */ + struct Limits { + size_t minThreads = 1; + size_t maxThreads = 8; + Milliseconds maxIdleThreadAge = Seconds{30}; + }; /** * Structure used to configure an instance of ThreadPool. */ struct Options { - - Options() = default; - explicit Options(const Limits& limits); - // Set maxThreads to this if you don't want to limit the number of threads in the pool. // Note: the value used here is high enough that it will never be reached, but low enough // that it won't cause overflows if mixed with signed ints or math. static constexpr size_t kUnlimited = 1'000'000'000; + Options() = default; + + explicit Options(const Limits& limits) + : minThreads(limits.minThreads), + maxThreads(limits.maxThreads), + maxIdleThreadAge(limits.maxIdleThreadAge) {} + // Name of the thread pool. If this string is empty, the pool will be assigned a // name unique to the current process. std::string poolName; @@ -95,29 +100,15 @@ public: // a thread. Milliseconds maxIdleThreadAge = Seconds{30}; - // This function is run before each worker thread begins consuming tasks. - using OnCreateThreadFn = std::function; - OnCreateThreadFn onCreateThread = [](const std::string&) {}; + /** If callable, called before each worker thread begins consuming tasks. */ + std::function onCreateThread; /** - * This function is called after joining each retired thread. + * If callable, called after joining each retired thread. * Since there could be multiple calls to this function in a single critical section, * avoid complex logic in the callback. */ - using OnJoinRetiredThreadFn = std::function; - OnJoinRetiredThreadFn onJoinRetiredThread = [](const stdx::thread&) {}; - }; - - /** - * Contains a subset of the fields from Options related to limiting the number of concurrent - * threads in the pool. Used in places where we want a way to specify limits to the size of a - * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate - * behaviors. Each field of Limits maps directly to the same-named field in Options. - */ - struct Limits { - size_t minThreads = 1; - size_t maxThreads = 8; - Milliseconds maxIdleThreadAge = Seconds{30}; + std::function onJoinRetiredThread; }; /** @@ -145,12 +136,18 @@ public: */ explicit ThreadPool(Options options); + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ~ThreadPool() override; + // from OutOfLineExecutor (base of ThreadPoolInterface) + void schedule(Task task) override; + + // from ThreadPoolInterface void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; /** * Blocks the caller until there are no pending tasks on this pool. @@ -170,120 +167,8 @@ public: Stats getStats() const; private: - using TaskList = std::deque; - using ThreadList = std::vector; - using RetiredThreadList = std::list; - - /** - * Representation of the stage of life of a thread pool. - * - * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work - * may only be scheduled in the preStart and running states. Threads may only be started in the - * running state. In shutdownComplete, there are no remaining threads or pending tasks to - * execute. - * - * Diagram of legal transitions: - * - * preStart -> running -> joinRequired -> joining -> shutdownComplete - * \ ^ - * \_____________/ - */ - enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete }; - - /** - * This is the thread body for worker threads. It is a static member function, - * because late in its execution it is possible for the pool to have been destroyed. - * As such, it is advisable to pass the pool pointer as an explicit argument, rather - * than as the implicit "this" argument. - */ - static void _workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept; - - /** - * Starts a worker thread, unless _options.maxThreads threads are already running or - * _state is not running. - */ - void _startWorkerThread_inlock(); - - /** - * This is the run loop of a worker thread, invoked by _workerThreadBody. - */ - void _consumeTasks(); - - /** - * Implementation of shutdown once _mutex is locked. - */ - void _shutdown_inlock(); - - /** - * Implementation of join once _mutex is owned by "lk". - */ - void _join_inlock(stdx::unique_lock* lk); - - /** - * Runs the remaining tasks on a new thread as part of the join process, blocking until - * complete. Caller must not hold the mutex! - */ - void _drainPendingTasks(); - - /** - * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at - * least one entry. - */ - void _doOneTask(stdx::unique_lock* lk) noexcept; - - /** - * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state - * change. Has no effect if _state == newState. - */ - void _setState_inlock(LifecycleState newState); - - /** - * Waits for all remaining retired threads to join. - * If a thread's _workerThreadBody() were ever to attempt to reacquire - * ThreadPool::_mutex after that thread had been added to _retiredThreads, - * it could cause a deadlock. - */ - void _joinRetired_inlock(); - - // These are the options with which the pool was configured at construction time. - const Options _options; - - // Mutex guarding all non-const member variables. - mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex"); - - // This variable represents the lifecycle state of the pool. - // - // Work may only be scheduled in states preStart and running, and only executes in states - // running and shuttingDown. - LifecycleState _state = preStart; - - // Condition signaled to indicate that there is work in the _pendingTasks queue, or - // that the system is shutting down. - stdx::condition_variable _workAvailable; - - // Condition signaled to indicate that there is no work in the _pendingTasks queue. - stdx::condition_variable _poolIsIdle; - - // Condition variable signaled whenever _state changes. - stdx::condition_variable _stateChange; - - // Queue of yet-to-be-executed tasks. - TaskList _pendingTasks; - - // List of threads serving as the worker pool. - ThreadList _threads; - - // List of threads that are retired and pending join - RetiredThreadList _retiredThreads; - - // Count of idle threads. - size_t _numIdleThreads = 0; - - // Id counter for assigning thread names - size_t _nextThreadId = 0; - - // The last time that _pendingTasks.size() grew to be at least _threads.size(). - Date_t _lastFullUtilizationDate; + class Impl; + std::unique_ptr _impl; }; } // namespace mongo -- cgit v1.2.1