summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2020-09-01 07:18:04 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-10 07:09:32 +0000
commit0ce948778cc5e0d7d8a9c76abd37e8ac875d9ef4 (patch)
treefe8946877ac2b2a0ec87ba7ad6afabd95c7da9fa
parentc7efb504fd6fe60580d95fba4b914c85abfc1fd2 (diff)
downloadmongo-0ce948778cc5e0d7d8a9c76abd37e8ac875d9ef4.tar.gz
SERVER-50228 ThreadPool predicate condvar wait
- Switch std::vector to std::list to enable node splicing. - fmt::format - ThreadPool::Impl - check for uncallable onCreate in ServiceExecutorFixed - _workerThreadBody can be member of ThreadPool. Threads don't detach anymore
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp4
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp429
-rw-r--r--src/mongo/util/concurrency/thread_pool.h181
3 files changed, 320 insertions, 294 deletions
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index bdf75660dce..f48a9d7a170 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -64,7 +64,9 @@ ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options)
_options.onCreateThread =
[this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable {
_executorContext = std::make_unique<ExecutorThreadContext>(this->weak_from_this());
- onCreate(name);
+ if (onCreate) {
+ onCreate(name);
+ }
};
_threadPool = std::make_unique<ThreadPool>(_options);
}
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index 680d397946f..0e8eda183b4 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -33,23 +33,37 @@
#include "mongo/util/concurrency/thread_pool.h"
+#include <deque>
+#include <fmt/format.h>
+#include <list>
+#include <sstream>
+#include <vector>
+
#include "mongo/base/status.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/concurrency/thread_name.h"
-#include "mongo/util/str.h"
-
-#include <sstream>
+#include "mongo/util/hierarchical_acquisition.h"
namespace mongo {
namespace {
+using namespace fmt::literals;
+
// Counter used to assign unique names to otherwise-unnamed thread pools.
AtomicWord<int> nextUnnamedThreadPoolId{1};
+std::string threadIdToString(stdx::thread::id id) {
+ std::ostringstream oss;
+ oss << id;
+ return oss.str();
+}
+
/**
* Sets defaults and checks bounds limits on "options", and returns it.
*
@@ -57,10 +71,10 @@ AtomicWord<int> nextUnnamedThreadPoolId{1};
*/
ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) {
if (options.poolName.empty()) {
- options.poolName = str::stream() << "ThreadPool" << nextUnnamedThreadPoolId.fetchAndAdd(1);
+ options.poolName = "ThreadPool{}"_format(nextUnnamedThreadPoolId.fetchAndAdd(1));
}
if (options.threadNamePrefix.empty()) {
- options.threadNamePrefix = str::stream() << options.poolName << '-';
+ options.threadNamePrefix = "{}-"_format(options.poolName);
}
if (options.maxThreads < 1) {
LOGV2_FATAL(28702,
@@ -85,28 +99,144 @@ ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) {
} // namespace
-ThreadPool::Options::Options(const ThreadPool::Limits& limits)
- : minThreads(limits.minThreads),
- maxThreads(limits.maxThreads),
- maxIdleThreadAge(limits.maxIdleThreadAge) {}
-ThreadPool::ThreadPool(Options options) : _options(cleanUpOptions(std::move(options))) {}
+// Public functions forwarded from ThreadPool.
+class ThreadPool::Impl {
+public:
+ explicit Impl(Options options);
+ ~Impl();
+ void startup();
+ void shutdown();
+ void join();
+ void schedule(Task task);
+ void waitForIdle();
+ Stats getStats() const;
+
+private:
+ /**
+ * Representation of the stage of life of a thread pool.
+ *
+ * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work
+ * may only be scheduled in the preStart and running states. Threads may only be started in the
+ * running state. In shutdownComplete, there are no remaining threads or pending tasks to
+ * execute.
+ *
+ * Diagram of legal transitions:
+ *
+ * preStart -> running -> joinRequired -> joining -> shutdownComplete
+ * \ ^
+ * \_____________/
+ */
+ enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete };
+
+ /** The thread body for worker threads. */
+ void _workerThreadBody(const std::string& threadName) noexcept;
+
+ /**
+ * Starts a worker thread, unless _options.maxThreads threads are already running or
+ * _state is not running.
+ */
+ void _startWorkerThread_inlock();
+
+ /**
+ * This is the run loop of a worker thread, invoked by _workerThreadBody.
+ */
+ void _consumeTasks();
+
+ /**
+ * Implementation of shutdown once _mutex is locked.
+ */
+ void _shutdown_inlock();
+
+ /**
+ * Implementation of join once _mutex is owned by "lk".
+ */
+ void _join_inlock(stdx::unique_lock<Latch>* lk);
+
+ /**
+ * Runs the remaining tasks on a new thread as part of the join process, blocking until
+ * complete. Caller must not hold the mutex!
+ */
+ void _drainPendingTasks();
+
+ /**
+ * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at
+ * least one entry.
+ */
+ void _doOneTask(stdx::unique_lock<Latch>* lk) noexcept;
+
+ /**
+ * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state
+ * change. Has no effect if _state == newState.
+ */
+ void _setState_inlock(LifecycleState newState);
+
+ /**
+ * Waits for all remaining retired threads to join.
+ * If a thread's _workerThreadBody() were ever to attempt to reacquire
+ * ThreadPool::_mutex after that thread had been added to _retiredThreads,
+ * it could cause a deadlock.
+ */
+ void _joinRetired_inlock();
+
+ // These are the options with which the pool was configured at construction time.
+ const Options _options;
+
+ // Mutex guarding all non-const member variables.
+ mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex");
+
+ // This variable represents the lifecycle state of the pool.
+ //
+ // Work may only be scheduled in states preStart and running, and only executes in states
+ // running and shuttingDown.
+ LifecycleState _state = preStart;
+
+ // Condition signaled to indicate that there is work in the _pendingTasks queue, or
+ // that the system is shutting down.
+ stdx::condition_variable _workAvailable;
+
+ // Condition signaled to indicate that there is no work in the _pendingTasks queue.
+ stdx::condition_variable _poolIsIdle;
+
+ // Condition variable signaled whenever _state changes.
+ stdx::condition_variable _stateChange;
+
+ // Queue of yet-to-be-executed tasks.
+ std::deque<Task> _pendingTasks;
+
+ // List of threads serving as the worker pool.
+ std::list<stdx::thread> _threads;
+
+ // List of threads that are retired and pending join
+ std::list<stdx::thread> _retiredThreads;
+
+ // Count of idle threads.
+ size_t _numIdleThreads = 0;
+
+ // Id counter for assigning thread names
+ size_t _nextThreadId = 0;
+
+ // The last time that _pendingTasks.size() grew to be at least _threads.size().
+ Date_t _lastFullUtilizationDate;
+};
+
+ThreadPool::Impl::Impl(Options options) : _options(cleanUpOptions(std::move(options))) {}
-ThreadPool::~ThreadPool() {
+ThreadPool::Impl::~Impl() {
stdx::unique_lock<Latch> lk(_mutex);
_shutdown_inlock();
- if (shutdownComplete != _state) {
+ if (_state != shutdownComplete) {
_join_inlock(&lk);
}
- if (shutdownComplete != _state) {
+ if (_state != shutdownComplete) {
LOGV2_FATAL(28704, "Failed to shutdown pool during destruction");
}
invariant(_threads.empty());
invariant(_pendingTasks.empty());
}
-void ThreadPool::startup() {
+void ThreadPool::Impl::startup() {
stdx::lock_guard<Latch> lk(_mutex);
if (_state != preStart) {
LOGV2_FATAL(28698,
@@ -116,19 +246,18 @@ void ThreadPool::startup() {
}
_setState_inlock(running);
invariant(_threads.empty());
- const size_t numToStart =
- std::min(_options.maxThreads, std::max(_options.minThreads, _pendingTasks.size()));
+ size_t numToStart = std::clamp(_pendingTasks.size(), _options.minThreads, _options.maxThreads);
for (size_t i = 0; i < numToStart; ++i) {
_startWorkerThread_inlock();
}
}
-void ThreadPool::shutdown() {
+void ThreadPool::Impl::shutdown() {
stdx::lock_guard<Latch> lk(_mutex);
_shutdown_inlock();
}
-void ThreadPool::_shutdown_inlock() {
+void ThreadPool::Impl::_shutdown_inlock() {
switch (_state) {
case preStart:
case running:
@@ -143,38 +272,30 @@ void ThreadPool::_shutdown_inlock() {
MONGO_UNREACHABLE;
}
-void ThreadPool::join() {
+void ThreadPool::Impl::join() {
stdx::unique_lock<Latch> lk(_mutex);
_join_inlock(&lk);
}
-void ThreadPool::_joinRetired_inlock() {
+void ThreadPool::Impl::_joinRetired_inlock() {
while (!_retiredThreads.empty()) {
auto& t = _retiredThreads.front();
t.join();
- _options.onJoinRetiredThread(t);
+ if (_options.onJoinRetiredThread)
+ _options.onJoinRetiredThread(t);
_retiredThreads.pop_front();
}
}
-void ThreadPool::_join_inlock(stdx::unique_lock<Latch>* lk) {
- _stateChange.wait(*lk, [this] {
- switch (_state) {
- case preStart:
- return false;
- case running:
- return false;
- case joinRequired:
- return true;
- case joining:
- case shutdownComplete:
- LOGV2_FATAL(28700,
- "Attempted to join pool {poolName} more than once",
- "Attempted to join pool more than once",
- "poolName"_attr = _options.poolName);
- }
- MONGO_UNREACHABLE;
- });
+void ThreadPool::Impl::_join_inlock(stdx::unique_lock<Latch>* lk) {
+ _stateChange.wait(*lk, [this] { return _state != preStart && _state != running; });
+ if (_state != joinRequired) {
+ LOGV2_FATAL(28700,
+ "Attempted to join pool {poolName} more than once",
+ "Attempted to join pool more than once",
+ "poolName"_attr = _options.poolName);
+ }
+
_setState_inlock(joining);
++_numIdleThreads;
if (!_pendingTasks.empty()) {
@@ -184,8 +305,7 @@ void ThreadPool::_join_inlock(stdx::unique_lock<Latch>* lk) {
}
--_numIdleThreads;
_joinRetired_inlock();
- ThreadList threadsToJoin;
- swap(threadsToJoin, _threads);
+ auto threadsToJoin = std::exchange(_threads, {});
lk->unlock();
for (auto& t : threadsToJoin) {
t.join();
@@ -195,14 +315,14 @@ void ThreadPool::_join_inlock(stdx::unique_lock<Latch>* lk) {
_setState_inlock(shutdownComplete);
}
-void ThreadPool::_drainPendingTasks() {
+void ThreadPool::Impl::_drainPendingTasks() {
// Tasks cannot be run inline because they can create OperationContexts and the join() caller
// may already have one associated with the thread.
stdx::thread cleanThread = stdx::thread([&] {
- const std::string threadName = str::stream()
- << _options.threadNamePrefix << _nextThreadId++;
+ const std::string threadName = "{}{}"_format(_options.threadNamePrefix, _nextThreadId++);
setThreadName(threadName);
- _options.onCreateThread(threadName);
+ if (_options.onCreateThread)
+ _options.onCreateThread(threadName);
stdx::unique_lock<Latch> lock(_mutex);
while (!_pendingTasks.empty()) {
_doOneTask(&lock);
@@ -211,16 +331,16 @@ void ThreadPool::_drainPendingTasks() {
cleanThread.join();
}
-void ThreadPool::schedule(Task task) {
+void ThreadPool::Impl::schedule(Task task) {
stdx::unique_lock<Latch> lk(_mutex);
switch (_state) {
case joinRequired:
case joining:
case shutdownComplete: {
- auto status = Status(ErrorCodes::ShutdownInProgress,
- str::stream() << "Shutdown of thread pool " << _options.poolName
- << " in progress");
+ auto status =
+ Status(ErrorCodes::ShutdownInProgress,
+ "Shutdown of thread pool {} in progress"_format(_options.poolName));
lk.unlock();
task(status);
@@ -246,15 +366,14 @@ void ThreadPool::schedule(Task task) {
_workAvailable.notify_one();
}
-void ThreadPool::waitForIdle() {
+void ThreadPool::Impl::waitForIdle() {
stdx::unique_lock<Latch> lk(_mutex);
- // If there are any pending tasks, or non-idle threads, the pool is not idle.
- while (!_pendingTasks.empty() || _numIdleThreads < _threads.size()) {
- _poolIsIdle.wait(lk);
- }
+ // True when there are no `_pendingTasks` and all `_threads` are idle.
+ auto isIdle = [this] { return _pendingTasks.empty() && _numIdleThreads >= _threads.size(); };
+ _poolIsIdle.wait(lk, isIdle);
}
-ThreadPool::Stats ThreadPool::getStats() const {
+ThreadPool::Stats ThreadPool::Impl::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
Stats result;
result.options = _options;
@@ -265,95 +384,91 @@ ThreadPool::Stats ThreadPool::getStats() const {
return result;
}
-void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept {
+void ThreadPool::Impl::_workerThreadBody(const std::string& threadName) noexcept {
setThreadName(threadName);
- pool->_options.onCreateThread(threadName);
- const auto poolName = pool->_options.poolName;
+ if (_options.onCreateThread)
+ _options.onCreateThread(threadName);
LOGV2_DEBUG(23104,
1,
"Starting thread {threadName} in pool {poolName}",
"Starting thread",
"threadName"_attr = threadName,
- "poolName"_attr = poolName);
- pool->_consumeTasks();
-
- // At this point, another thread may have destroyed "pool", if this thread chose to detach
- // itself and remove itself from pool->_threads before releasing pool->_mutex. Do not access
- // member variables of "pool" from here, on.
- //
- // This can happen if this thread decided to retire, got descheduled after removing itself
- // from _threads and calling detach(), and then the pool was deleted. When this thread resumes,
- // it is no longer safe to access "pool".
+ "poolName"_attr = _options.poolName);
+ _consumeTasks();
LOGV2_DEBUG(23105,
1,
"Shutting down thread {threadName} in pool {poolName}",
"Shutting down thread",
"threadName"_attr = threadName,
- "poolName"_attr = poolName);
+ "poolName"_attr = _options.poolName);
}
-void ThreadPool::_consumeTasks() {
+void ThreadPool::Impl::_consumeTasks() {
stdx::unique_lock<Latch> lk(_mutex);
while (_state == running) {
- if (_pendingTasks.empty()) {
- /**
- * Help with garbage collecting retired threads to:
- * * Reduce the memory overhead of _retiredThreads
- * * Expedite the shutdown process
- */
- _joinRetired_inlock();
-
- if (_threads.size() > _options.minThreads) {
- // Since there are more than minThreads threads, this thread may be eligible for
- // retirement. If it isn't now, it may be later, so it must put a time limit on how
- // long it waits on _workAvailable.
- const auto now = Date_t::now();
- const auto nextThreadRetirementDate =
- _lastFullUtilizationDate + _options.maxIdleThreadAge;
- if (now >= nextThreadRetirementDate) {
- _lastFullUtilizationDate = now;
- LOGV2_DEBUG(23106,
- 1,
- "Reaping this thread; next thread reaped no earlier than "
- "{nextThreadRetirementDate}",
- "Reaping this thread",
- "nextThreadRetirementDate"_attr =
- _lastFullUtilizationDate + _options.maxIdleThreadAge);
- break;
- }
-
- LOGV2_DEBUG(23107,
- 3,
- "Not reaping this thread because the earliest retirement date is "
+ if (!_pendingTasks.empty()) {
+ _doOneTask(&lk);
+ continue;
+ }
+
+ // Help with garbage collecting retired threads to reduce the
+ // memory overhead of _retiredThreads and expedite the shutdown
+ // process.
+ _joinRetired_inlock();
+
+ boost::optional<Date_t> waitDeadline;
+
+ if (_threads.size() > _options.minThreads) {
+ // Since there are more than minThreads threads, this thread may be eligible for
+ // retirement. If it isn't now, it may be later, so it must put a time limit on how
+ // long it waits on _workAvailable.
+ const auto now = Date_t::now();
+ const auto nextRetirement = _lastFullUtilizationDate + _options.maxIdleThreadAge;
+ if (now >= nextRetirement) {
+ _lastFullUtilizationDate = now;
+ LOGV2_DEBUG(23106,
+ 1,
+ "Reaping this thread; next thread reaped no earlier than "
"{nextThreadRetirementDate}",
- "Not reaping this thread",
- "nextThreadRetirementDate"_attr = nextThreadRetirementDate);
- MONGO_IDLE_THREAD_BLOCK;
- _workAvailable.wait_until(lk, nextThreadRetirementDate.toSystemTimePoint());
- } else {
- // Since the number of threads is not more than minThreads, this thread is not
- // eligible for retirement. It is OK to sleep until _workAvailable is signaled,
- // because any new threads that put the number of total threads above minThreads
- // would be eligible for retirement once they had no work left to do.
- LOGV2_DEBUG(23108,
- 3,
- "Waiting for work; the thread pool size is {numThreads}; the minimum "
- "number of threads is {minThreads}",
- "Waiting for work",
- "numThreads"_attr = _threads.size(),
- "minThreads"_attr = _options.minThreads);
- MONGO_IDLE_THREAD_BLOCK;
- _workAvailable.wait(lk);
+ "Reaping this thread",
+ "nextThreadRetirementDate"_attr =
+ _lastFullUtilizationDate + _options.maxIdleThreadAge);
+ break;
}
- continue;
+
+ LOGV2_DEBUG(23107,
+ 3,
+ "Not reaping this thread because the earliest retirement date is "
+ "{nextThreadRetirementDate}",
+ "Not reaping this thread",
+ "nextThreadRetirementDate"_attr = nextRetirement);
+ waitDeadline = nextRetirement;
+ } else {
+ // Since the number of threads is not more than minThreads, this thread is not
+ // eligible for retirement. It is OK to sleep until _workAvailable is signaled,
+ // because any new threads that put the number of total threads above minThreads
+ // would be eligible for retirement once they had no work left to do.
+ LOGV2_DEBUG(23108,
+ 3,
+ "Waiting for work; the thread pool size is {numThreads}; the minimum "
+ "number of threads is {minThreads}",
+ "Waiting for work",
+ "numThreads"_attr = _threads.size(),
+ "minThreads"_attr = _options.minThreads);
}
- _doOneTask(&lk);
+ auto wake = [&] { return _state != running || !_pendingTasks.empty(); };
+ MONGO_IDLE_THREAD_BLOCK;
+ if (waitDeadline) {
+ _workAvailable.wait_until(lk, waitDeadline->toSystemTimePoint(), wake);
+ } else {
+ _workAvailable.wait(lk, wake);
+ }
}
// We still hold the lock, but this thread is retiring. If the whole pool is shutting down, this
// thread lends a hand in draining the work pool and returns so it can be joined. Otherwise, it
- // falls through to the detach code, below.
+ // falls through to the thread retirement code, below.
if (_state == joinRequired || _state == joining) {
// Drain the leftover pending tasks.
@@ -375,29 +490,22 @@ void ThreadPool::_consumeTasks() {
"expectedState"_attr = static_cast<int32_t>(running));
}
- // This thread is ending because it was idle for too long. Find self in _threads, remove self
- // from _threads, and add self to the list of retired threads.
- for (size_t i = 0; i < _threads.size(); ++i) {
- auto& t = _threads[i];
- if (t.get_id() != stdx::this_thread::get_id()) {
- continue;
- }
- std::swap(t, _threads.back());
- _retiredThreads.push_back(std::move(_threads.back()));
- _threads.pop_back();
- return;
+ // This thread is ending because it was idle for too long.
+ // Move self from _threads to _retiredThreads.
+ auto selfId = stdx::this_thread::get_id();
+ auto pos = std::find_if(
+ _threads.begin(), _threads.end(), [&](auto&& t) { return t.get_id() == selfId; });
+ if (pos == _threads.end()) {
+ LOGV2_FATAL_NOTRACE(28703,
+ "Could not find thread with id {threadId} in pool {poolName}",
+ "Could not find thread",
+ "threadId"_attr = threadIdToString(selfId),
+ "poolName"_attr = _options.poolName);
}
-
- std::ostringstream threadId;
- threadId << stdx::this_thread::get_id();
- LOGV2_FATAL_NOTRACE(28703,
- "Could not find thread with id {threadId} in pool {poolName}",
- "Could not find thread",
- "threadId"_attr = threadId.str(),
- "poolName"_attr = _options.poolName);
+ _retiredThreads.splice(_retiredThreads.end(), _threads, pos);
}
-void ThreadPool::_doOneTask(stdx::unique_lock<Latch>* lk) noexcept {
+void ThreadPool::Impl::_doOneTask(stdx::unique_lock<Latch>* lk) noexcept {
invariant(!_pendingTasks.empty());
LOGV2_DEBUG(23109,
3,
@@ -416,7 +524,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock<Latch>* lk) noexcept {
}
}
-void ThreadPool::_startWorkerThread_inlock() {
+void ThreadPool::Impl::_startWorkerThread_inlock() {
switch (_state) {
case preStart:
LOGV2_DEBUG(
@@ -452,9 +560,9 @@ void ThreadPool::_startWorkerThread_inlock() {
return;
}
invariant(_threads.size() < _options.maxThreads);
- const std::string threadName = str::stream() << _options.threadNamePrefix << _nextThreadId++;
+ std::string threadName = "{}{}"_format(_options.threadNamePrefix, _nextThreadId++);
try {
- _threads.emplace_back([this, threadName] { _workerThreadBody(this, threadName); });
+ _threads.emplace_back([this, threadName] { _workerThreadBody(threadName); });
++_numIdleThreads;
} catch (const std::exception& ex) {
LOGV2_ERROR(23113,
@@ -468,7 +576,7 @@ void ThreadPool::_startWorkerThread_inlock() {
}
}
-void ThreadPool::_setState_inlock(const LifecycleState newState) {
+void ThreadPool::Impl::_setState_inlock(const LifecycleState newState) {
if (newState == _state) {
return;
}
@@ -476,4 +584,35 @@ void ThreadPool::_setState_inlock(const LifecycleState newState) {
_stateChange.notify_all();
}
+// ========================================
+// ThreadPool public functions that simply forward to the `_impl`.
+
+ThreadPool::ThreadPool(Options options) : _impl{std::make_unique<Impl>(std::move(options))} {}
+
+ThreadPool::~ThreadPool() = default;
+
+void ThreadPool::startup() {
+ _impl->startup();
+}
+
+void ThreadPool::shutdown() {
+ _impl->shutdown();
+}
+
+void ThreadPool::join() {
+ _impl->join();
+}
+
+void ThreadPool::schedule(Task task) {
+ _impl->schedule(std::move(task));
+}
+
+void ThreadPool::waitForIdle() {
+ _impl->waitForIdle();
+}
+
+ThreadPool::Stats ThreadPool::getStats() const {
+ return _impl->getStats();
+}
+
} // namespace mongo
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index a6e56f8c9bf..29acd9e09c0 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -29,47 +29,52 @@
#pragma once
-#include <deque>
#include <functional>
+#include <memory>
#include <string>
-#include <vector>
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
-#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
namespace mongo {
-class Status;
-
/**
* A configurable thread pool, for general use.
*
* See the Options struct for information about how to configure an instance.
*/
class ThreadPool final : public ThreadPoolInterface {
- ThreadPool(const ThreadPool&) = delete;
- ThreadPool& operator=(const ThreadPool&) = delete;
-
public:
- struct Limits;
+ /**
+ * Contains a subset of the fields from Options related to limiting the number of concurrent
+ * threads in the pool. Used in places where we want a way to specify limits to the size of a
+ * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate
+ * behaviors. Each field of Limits maps directly to the same-named field in Options.
+ */
+ struct Limits {
+ size_t minThreads = 1;
+ size_t maxThreads = 8;
+ Milliseconds maxIdleThreadAge = Seconds{30};
+ };
/**
* Structure used to configure an instance of ThreadPool.
*/
struct Options {
-
- Options() = default;
- explicit Options(const Limits& limits);
-
// Set maxThreads to this if you don't want to limit the number of threads in the pool.
// Note: the value used here is high enough that it will never be reached, but low enough
// that it won't cause overflows if mixed with signed ints or math.
static constexpr size_t kUnlimited = 1'000'000'000;
+ Options() = default;
+
+ explicit Options(const Limits& limits)
+ : minThreads(limits.minThreads),
+ maxThreads(limits.maxThreads),
+ maxIdleThreadAge(limits.maxIdleThreadAge) {}
+
// Name of the thread pool. If this string is empty, the pool will be assigned a
// name unique to the current process.
std::string poolName;
@@ -95,29 +100,15 @@ public:
// a thread.
Milliseconds maxIdleThreadAge = Seconds{30};
- // This function is run before each worker thread begins consuming tasks.
- using OnCreateThreadFn = std::function<void(const std::string& threadName)>;
- OnCreateThreadFn onCreateThread = [](const std::string&) {};
+ /** If callable, called before each worker thread begins consuming tasks. */
+ std::function<void(const std::string&)> onCreateThread;
/**
- * This function is called after joining each retired thread.
+ * If callable, called after joining each retired thread.
* Since there could be multiple calls to this function in a single critical section,
* avoid complex logic in the callback.
*/
- using OnJoinRetiredThreadFn = std::function<void(const stdx::thread&)>;
- OnJoinRetiredThreadFn onJoinRetiredThread = [](const stdx::thread&) {};
- };
-
- /**
- * Contains a subset of the fields from Options related to limiting the number of concurrent
- * threads in the pool. Used in places where we want a way to specify limits to the size of a
- * ThreadPool without overriding the other behaviors of the pool such thread names or onCreate
- * behaviors. Each field of Limits maps directly to the same-named field in Options.
- */
- struct Limits {
- size_t minThreads = 1;
- size_t maxThreads = 8;
- Milliseconds maxIdleThreadAge = Seconds{30};
+ std::function<void(const stdx::thread&)> onJoinRetiredThread;
};
/**
@@ -145,12 +136,18 @@ public:
*/
explicit ThreadPool(Options options);
+ ThreadPool(const ThreadPool&) = delete;
+ ThreadPool& operator=(const ThreadPool&) = delete;
+
~ThreadPool() override;
+ // from OutOfLineExecutor (base of ThreadPoolInterface)
+ void schedule(Task task) override;
+
+ // from ThreadPoolInterface
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
/**
* Blocks the caller until there are no pending tasks on this pool.
@@ -170,120 +167,8 @@ public:
Stats getStats() const;
private:
- using TaskList = std::deque<Task>;
- using ThreadList = std::vector<stdx::thread>;
- using RetiredThreadList = std::list<stdx::thread>;
-
- /**
- * Representation of the stage of life of a thread pool.
- *
- * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work
- * may only be scheduled in the preStart and running states. Threads may only be started in the
- * running state. In shutdownComplete, there are no remaining threads or pending tasks to
- * execute.
- *
- * Diagram of legal transitions:
- *
- * preStart -> running -> joinRequired -> joining -> shutdownComplete
- * \ ^
- * \_____________/
- */
- enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete };
-
- /**
- * This is the thread body for worker threads. It is a static member function,
- * because late in its execution it is possible for the pool to have been destroyed.
- * As such, it is advisable to pass the pool pointer as an explicit argument, rather
- * than as the implicit "this" argument.
- */
- static void _workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept;
-
- /**
- * Starts a worker thread, unless _options.maxThreads threads are already running or
- * _state is not running.
- */
- void _startWorkerThread_inlock();
-
- /**
- * This is the run loop of a worker thread, invoked by _workerThreadBody.
- */
- void _consumeTasks();
-
- /**
- * Implementation of shutdown once _mutex is locked.
- */
- void _shutdown_inlock();
-
- /**
- * Implementation of join once _mutex is owned by "lk".
- */
- void _join_inlock(stdx::unique_lock<Latch>* lk);
-
- /**
- * Runs the remaining tasks on a new thread as part of the join process, blocking until
- * complete. Caller must not hold the mutex!
- */
- void _drainPendingTasks();
-
- /**
- * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at
- * least one entry.
- */
- void _doOneTask(stdx::unique_lock<Latch>* lk) noexcept;
-
- /**
- * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state
- * change. Has no effect if _state == newState.
- */
- void _setState_inlock(LifecycleState newState);
-
- /**
- * Waits for all remaining retired threads to join.
- * If a thread's _workerThreadBody() were ever to attempt to reacquire
- * ThreadPool::_mutex after that thread had been added to _retiredThreads,
- * it could cause a deadlock.
- */
- void _joinRetired_inlock();
-
- // These are the options with which the pool was configured at construction time.
- const Options _options;
-
- // Mutex guarding all non-const member variables.
- mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ThreadPool::_mutex");
-
- // This variable represents the lifecycle state of the pool.
- //
- // Work may only be scheduled in states preStart and running, and only executes in states
- // running and shuttingDown.
- LifecycleState _state = preStart;
-
- // Condition signaled to indicate that there is work in the _pendingTasks queue, or
- // that the system is shutting down.
- stdx::condition_variable _workAvailable;
-
- // Condition signaled to indicate that there is no work in the _pendingTasks queue.
- stdx::condition_variable _poolIsIdle;
-
- // Condition variable signaled whenever _state changes.
- stdx::condition_variable _stateChange;
-
- // Queue of yet-to-be-executed tasks.
- TaskList _pendingTasks;
-
- // List of threads serving as the worker pool.
- ThreadList _threads;
-
- // List of threads that are retired and pending join
- RetiredThreadList _retiredThreads;
-
- // Count of idle threads.
- size_t _numIdleThreads = 0;
-
- // Id counter for assigning thread names
- size_t _nextThreadId = 0;
-
- // The last time that _pendingTasks.size() grew to be at least _threads.size().
- Date_t _lastFullUtilizationDate;
+ class Impl;
+ std::unique_ptr<Impl> _impl;
};
} // namespace mongo