summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2015-06-12 19:09:04 -0400
committerAndy Schwerin <schwerin@mongodb.com>2015-06-23 15:20:04 -0400
commit891e0b850200725dc73e1ac1cd803436cd6099bc (patch)
tree9661c3079d5c6cecb8555d3e0abc4224ce867df0 /src/mongo/util/concurrency
parentb6057b08dc313ef372db05f5945dd23b9b1ed7cf (diff)
downloadmongo-891e0b850200725dc73e1ac1cd803436cd6099bc.tar.gz
SERVER-19000 Implement a ThreadPool that dynamically adjusts the number of threads based on demand.
This pool is derived from the one built into executor::NetworkInterfaceImpl.
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r--src/mongo/util/concurrency/SConscript8
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp378
-rw-r--r--src/mongo/util/concurrency/thread_pool.h267
-rw-r--r--src/mongo/util/concurrency/thread_pool_test.cpp284
4 files changed, 936 insertions, 1 deletions
diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript
index 505ff9fc98d..0dbc9c232d9 100644
--- a/src/mongo/util/concurrency/SConscript
+++ b/src/mongo/util/concurrency/SConscript
@@ -11,12 +11,18 @@ env.Library(
target='thread_pool',
source=[
'old_thread_pool.cpp',
+ 'thread_pool.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/third_party/shim_boost',
+ '$BUILD_DIR/mongo/util/foundation',
],
)
+env.CppUnitTest(
+ target='thread_pool_test',
+ source=['thread_pool_test.cpp'],
+ LIBDEPS=['thread_pool'])
+
env.Library('ticketholder',
['ticketholder.cpp'],
LIBDEPS=['$BUILD_DIR/mongo/base/base',
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
new file mode 100644
index 00000000000..2be5983c535
--- /dev/null
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -0,0 +1,378 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/util/concurrency/thread_pool.h"
+
+#include "mongo/base/status.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/thread_name.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+namespace {
+
+// Counter used to assign unique names to otherwise-unnamed thread pools.
+AtomicInt32 nextUnnamedThreadPoolId{1};
+
+/**
+ * Sets defaults and checks bounds limits on "options", and returns it.
+ *
+ * This method is just a helper for the ThreadPool constructor.
+ */
+ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) {
+ if (options.poolName.empty()) {
+ options.poolName = str::stream() << "ThreadPool" << nextUnnamedThreadPoolId.fetchAndAdd(1);
+ }
+ if (options.threadNamePrefix.empty()) {
+ options.threadNamePrefix = str::stream() << options.poolName << '-';
+ }
+ if (options.maxThreads < 1) {
+ severe() << "Tried to create pool " << options.poolName << " with a maximum of "
+ << options.maxThreads << " but the maximum must be at least 1";
+ fassertFailed(28702);
+ }
+ if (options.minThreads > options.maxThreads) {
+ severe() << "Tried to create pool " << options.poolName << " with a minimum of "
+ << options.minThreads << " which is more than the configured maximum of "
+ << options.maxThreads;
+ fassertFailed(28686);
+ }
+ return options;
+}
+
+} // namespace
+
+ThreadPool::ThreadPool(Options options) : _options(cleanUpOptions(std::move(options))) {}
+
+ThreadPool::~ThreadPool() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _shutdown_inlock();
+ if (shutdownComplete != _state) {
+ _join_inlock(&lk);
+ }
+
+ if (shutdownComplete != _state) {
+ severe() << "Failed to shutdown pool during destruction";
+ fassertFailed(28704);
+ }
+ invariant(_threads.empty());
+ invariant(_pendingTasks.empty());
+}
+
+void ThreadPool::startup() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_state != preStart) {
+ severe() << "Attempting to start pool " << _options.poolName
+ << ", but it has already started";
+ fassertFailed(28698);
+ }
+ _setState_inlock(running);
+ invariant(_threads.empty());
+ const size_t numToStart =
+ std::min(_options.maxThreads, std::max(_options.minThreads, _pendingTasks.size()));
+ for (size_t i = 0; i < numToStart; ++i) {
+ _startWorkerThread_inlock();
+ }
+}
+
+void ThreadPool::shutdown() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _shutdown_inlock();
+}
+
+void ThreadPool::_shutdown_inlock() {
+ switch (_state) {
+ case preStart:
+ case running:
+ _setState_inlock(joinRequired);
+ _workAvailable.notify_all();
+ return;
+ case joinRequired:
+ case joining:
+ case shutdownComplete:
+ return;
+ }
+ MONGO_UNREACHABLE;
+}
+
+void ThreadPool::join() {
+ try {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _join_inlock(&lk);
+ } catch (...) {
+ std::terminate();
+ }
+}
+
+void ThreadPool::_join_inlock(stdx::unique_lock<stdx::mutex>* lk) {
+ _stateChange.wait(*lk,
+ [this] {
+ switch (_state) {
+ case preStart:
+ return false;
+ case running:
+ return false;
+ case joinRequired:
+ return true;
+ case joining:
+ case shutdownComplete:
+ severe() << "Attempted to join pool " << _options.poolName
+ << " more than once";
+ fassertFailed(28700);
+ }
+ MONGO_UNREACHABLE;
+ });
+ _setState_inlock(joining);
+ ++_numIdleThreads;
+ while (!_pendingTasks.empty()) {
+ _doOneTask(lk);
+ }
+ --_numIdleThreads;
+ ThreadList threadsToJoin;
+ swap(threadsToJoin, _threads);
+ lk->unlock();
+ for (auto& t : threadsToJoin) {
+ t.join();
+ }
+ lk->lock();
+ invariant(_state == joining);
+ _setState_inlock(shutdownComplete);
+}
+
+Status ThreadPool::schedule(Task task) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ switch (_state) {
+ case joinRequired:
+ case joining:
+ case shutdownComplete:
+ return Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Shutdown of thread pool " << _options.poolName
+ << " in progress");
+ case preStart:
+ case running:
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ _pendingTasks.emplace_back(std::move(task));
+ if (_state == preStart) {
+ return Status::OK();
+ }
+ if (_numIdleThreads < _pendingTasks.size()) {
+ _startWorkerThread_inlock();
+ }
+ if (_numIdleThreads <= _pendingTasks.size()) {
+ _lastFullUtilizationDate = Date_t::now();
+ }
+ _workAvailable.notify_one();
+ return Status::OK();
+}
+
+void ThreadPool::waitForIdle() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ // If there are any pending tasks, or non-idle threads, the pool is not idle.
+ while (!_pendingTasks.empty() || _numIdleThreads < _threads.size()) {
+ _poolIsIdle.wait(lk);
+ }
+}
+
+ThreadPool::Stats ThreadPool::getStats() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ Stats result;
+ result.options = _options;
+ result.numThreads = _threads.size();
+ result.numIdleThreads = _numIdleThreads;
+ result.numPendingTasks = _pendingTasks.size();
+ result.lastFullUtilizationDate = _lastFullUtilizationDate;
+ return result;
+}
+
+void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) {
+ std::string poolName = pool->_options.poolName;
+ setThreadName(threadName);
+ LOG(1) << "starting thread in pool " << poolName;
+ try {
+ pool->_consumeTasks();
+ } catch (...) {
+ severe() << "Exception reached top of stack in thread pool " << poolName;
+ std::terminate();
+ }
+
+ // At this point, another thread may have destroyed "pool", if this thread chose to detach
+ // itself and remove itself from pool->_threads before releasing pool->_mutex. Do not access
+ // member variables of "pool" from here, on.
+ //
+ // This can happen if this thread decided to retire, got descheduled after removing itself
+ // from _threads and calling detach(), and then the pool was deleted. When this thread resumes,
+ // it is no longer safe to access "pool".
+ LOG(1) << "shutting down thread in pool " << poolName;
+}
+
+void ThreadPool::_consumeTasks() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_state == running) {
+ if (_pendingTasks.empty()) {
+ if (_threads.size() > _options.minThreads) {
+ // Since there are more than minThreads threads, this thread may be eligible for
+ // retirement. If it isn't now, it may be later, so it must put a time limit on how
+ // long it waits on _workAvailable.
+ const auto now = Date_t::now();
+ const auto nextThreadRetirementDate =
+ _lastFullUtilizationDate + _options.maxIdleThreadAge;
+ if (now >= nextThreadRetirementDate) {
+ _lastFullUtilizationDate = now;
+ LOG(1) << "Reaping this thread; next thread reaped no earlier than "
+ << _lastFullUtilizationDate + _options.maxIdleThreadAge;
+ break;
+ }
+
+ LOG(3) << "Not reaping because the earliest retirement date is "
+ << nextThreadRetirementDate;
+ _workAvailable.wait_until(lk, nextThreadRetirementDate.toSystemTimePoint());
+ } else {
+ // Since the number of threads is not more than minThreads, this thread is not
+ // eligible for retirement. It is OK to sleep until _workAvailable is signaled,
+ // because any new threads that put the number of total threads above minThreads
+ // would be eligible for retirement once they had no work left to do.
+ LOG(3) << "waiting for work; I am one of " << _threads.size() << " thread(s);"
+ << " the minimum number of threads is " << _options.minThreads;
+ _workAvailable.wait(lk);
+ }
+ continue;
+ }
+
+ _doOneTask(&lk);
+ }
+
+ // We still hold the lock, but this thread is retiring. If the whole pool is shutting down, this
+ // thread lends a hand in draining the work pool and returns so it can be joined. Otherwise, it
+ // falls through to the detach code, below.
+
+ if (_state == joinRequired || _state == joining) {
+ // Drain the leftover pending tasks.
+ while (!_pendingTasks.empty()) {
+ _doOneTask(&lk);
+ }
+ --_numIdleThreads;
+ return;
+ }
+ --_numIdleThreads;
+
+ if (_state != running) {
+ severe() << "State of pool " << _options.poolName << " is " << static_cast<int32_t>(_state)
+ << ", but expected " << static_cast<int32_t>(running);
+ fassertFailedNoTrace(28701);
+ }
+
+ // This thread is ending because it was idle for too long. Find self in _threads, remove self
+ // from _threads, detach self.
+ for (size_t i = 0; i < _threads.size(); ++i) {
+ auto& t = _threads[i];
+ if (t.get_id() != stdx::this_thread::get_id()) {
+ continue;
+ }
+ t.detach();
+ t.swap(_threads.back());
+ _threads.pop_back();
+ return;
+ }
+ severe().stream() << "Could not find this thread, with id " << stdx::this_thread::get_id()
+ << " in pool " << _options.poolName;
+ fassertFailedNoTrace(28703);
+}
+
+void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) {
+ invariant(!_pendingTasks.empty());
+ try {
+ LOG(3) << "Executing a task on behalf of pool " << _options.poolName;
+ Task task = std::move(_pendingTasks.front());
+ _pendingTasks.pop_front();
+ --_numIdleThreads;
+ lk->unlock();
+ task();
+ lk->lock();
+ ++_numIdleThreads;
+ if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) {
+ _poolIsIdle.notify_all();
+ }
+ } catch (...) {
+ severe() << "Exception escaped task in thread pool " << _options.poolName;
+ std::terminate();
+ }
+}
+
+void ThreadPool::_startWorkerThread_inlock() {
+ switch (_state) {
+ case preStart:
+ LOG(1) << "Not starting new thread in pool " << _options.poolName
+ << ", yet; waiting for startup() call";
+ return;
+ case joinRequired:
+ case joining:
+ case shutdownComplete:
+ LOG(1) << "Not starting new thread in pool " << _options.poolName
+ << " while shutting down";
+ return;
+ case running:
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ if (_threads.size() == _options.maxThreads) {
+ LOG(2) << "Not starting new thread in pool " << _options.poolName
+ << " because it already has " << _options.maxThreads << ", its maximum";
+ return;
+ }
+ invariant(_threads.size() < _options.maxThreads);
+ const std::string threadName = str::stream() << _options.threadNamePrefix << _nextThreadId++;
+ try {
+ _threads.emplace_back(stdx::bind(&ThreadPool::_workerThreadBody, this, threadName));
+ ++_numIdleThreads;
+ } catch (const std::exception& ex) {
+ error() << "Failed to start " << threadName << "; " << _threads.size()
+ << " other thread(s) still running in pool " << _options.poolName
+ << "; caught exception: " << ex.what();
+ }
+}
+
+void ThreadPool::_setState_inlock(const LifecycleState newState) {
+ if (newState == _state) {
+ return;
+ }
+ _state = newState;
+ _stateChange.notify_all();
+}
+
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
new file mode 100644
index 00000000000..c186e87d731
--- /dev/null
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -0,0 +1,267 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/list.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class Status;
+
+/**
+ * A configurable thread pool, for general use.
+ *
+ * See the Options struct for information about how to configure an instance.
+ */
+class ThreadPool {
+ MONGO_DISALLOW_COPYING(ThreadPool);
+
+public:
+ using Task = stdx::function<void()>;
+
+ /**
+ * Structure used to configure an instance of ThreadPool.
+ */
+ struct Options {
+ // Name of the thread pool. If this string is empty, the pool will be assigned a
+ // name unique to the current process.
+ std::string poolName;
+
+ // Prefix used to name threads for logging purposes.
+ //
+ // An integer will be appended to this string to create the thread name for each thread in
+ // the pool. Warning, if you create two pools and give them the same threadNamePrefix, you
+ // could have multiple threads that report the same name. If you leave this empty, the
+ // prefix will be the pool name followed by a hyphen.
+ std::string threadNamePrefix;
+
+ // Minimum number of threads that must be in the pool.
+ //
+ // At least this many threads will be created at startup, and the pool will not reduce the
+ // total number of threads below this threshold before shutdown.
+ size_t minThreads = 1;
+
+ // The pool will never grow to contain more than this many threads.
+ size_t maxThreads = 8;
+
+ // If the pool has had at least one idle thread for this much time, it may consider reaping
+ // a thread.
+ Milliseconds maxIdleThreadAge = Seconds{30};
+ };
+
+ /**
+ * Structure used to return information about the thread pool via getStats().
+ */
+ struct Stats {
+ // The options for the instance of the pool returning these stats.
+ Options options;
+
+ // The number of threads currently in the pool, idle or active.
+ size_t numThreads;
+
+ // The number of idle threads currently in the pool.
+ size_t numIdleThreads;
+
+ // The number of tasks waiting to be executed by the pool.
+ size_t numPendingTasks;
+
+ // The last time that no threads in the pool were idle.
+ Date_t lastFullUtilizationDate;
+ };
+
+ /**
+ * Constructs a thread pool, configured with the given "options".
+ */
+ explicit ThreadPool(Options options);
+
+ /**
+ * Destroys a thread pool.
+ *
+ * The destructor may block if join() has not previously been called and returned.
+ * It is fatal to destroy the pool while another thread is blocked on join().
+ */
+ ~ThreadPool();
+
+ /**
+ * Starts the thread pool. May be called at most once.
+ */
+ void startup();
+
+ /**
+ * Signals the thread pool to shut down. Returns promptly.
+ *
+ * After this call, the thread will return an error for subsequent calls to schedule().
+ *
+ * May be called by a task executing in the thread pool. Call join() after calling shutdown()
+ * to block until all tasks scheduled on the pool complete.
+ */
+ void shutdown();
+
+ /**
+ * Blocks until the thread pool has fully shut down. Call at most once, and never from a task
+ * inside the pool.
+ */
+ void join();
+
+ /**
+ * Schedules "task" to run in the thread pool.
+ *
+ * Returns OK on success, ShutdownInProgress if shutdown() has already executed.
+ *
+ * It is safe to call this before startup(), but the scheduled task will not execute
+ * until after startup() is called.
+ */
+ Status schedule(Task task);
+
+ /**
+ * Blocks the caller until there are no pending tasks on this pool.
+ *
+ * It is legal to call this whether or not shutdown has been called, but if it is called
+ * *before* shutdown() is called, there is no guarantee that there will still be no pending
+ * tasks when the function returns.
+ *
+ * May be called multiple times, by multiple threads. May not be called by a task in the thread
+ * pool.
+ */
+ void waitForIdle();
+
+ /**
+ * Returns statistics about the thread pool's utilization.
+ */
+ Stats getStats();
+
+private:
+ using TaskList = stdx::list<Task>;
+ using ThreadList = std::vector<stdx::thread>;
+
+ /**
+ * Representation of the stage of life of a thread pool.
+ *
+ * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work
+ * may only be scheduled in the preStart and running states. Threads may only be started in the
+ * running state. In shutdownComplete, there are no remaining threads or pending tasks to
+ * execute.
+ *
+ * Diagram of legal transitions:
+ *
+ * preStart -> running -> joinRequired -> joining -> shutdownComplete
+ * \ ^
+ * \_____________/
+ */
+ enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete };
+
+ /**
+ * This is the thread body for worker threads. It is a static member function,
+ * because late in its execution it is possible for the pool to have been destroyed.
+ * As such, it is advisable to pass the pool pointer as an explicit argument, rather
+ * than as the implicit "this" argument.
+ */
+ static void _workerThreadBody(ThreadPool* pool, const std::string& threadName);
+
+ /**
+ * Starts a worker thread, unless _options.maxThreads threads are already running or
+ * _state is not running.
+ */
+ void _startWorkerThread_inlock();
+
+ /**
+ * This is the run loop of a worker thread, invoked by _workerThreadBody.
+ */
+ void _consumeTasks();
+
+ /**
+ * Implementation of shutdown once _mutex is locked.
+ */
+ void _shutdown_inlock();
+
+ /**
+ * Implementation of join once _mutex is owned by "lk".
+ */
+ void _join_inlock(stdx::unique_lock<stdx::mutex>* lk);
+
+ /**
+ * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at
+ * least one entry.
+ */
+ void _doOneTask(stdx::unique_lock<stdx::mutex>* lk);
+
+ /**
+ * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state
+ * change. Has no effect if _state == newState.
+ */
+ void _setState_inlock(LifecycleState newState);
+
+ // These are the options with which the pool was configured at construction time.
+ const Options _options;
+
+ // Mutex guarding all non-const member variables.
+ stdx::mutex _mutex;
+
+ // This variable represents the lifecycle state of the pool.
+ //
+ // Work may only be scheduled in states preStart and running, and only executes in states
+ // running and shuttingDown.
+ LifecycleState _state = preStart;
+
+ // Condition signaled to indicate that there is work in the _pendingTasks queue, or
+ // that the system is shutting down.
+ stdx::condition_variable _workAvailable;
+
+ // Condition signaled to indicate that there is no work in the _pendingTasks queue.
+ stdx::condition_variable _poolIsIdle;
+
+ // Condition variable signaled whenever _state changes.
+ stdx::condition_variable _stateChange;
+
+ // Queue of yet-to-be-executed tasks.
+ TaskList _pendingTasks;
+
+ // List of threads serving as the worker pool.
+ ThreadList _threads;
+
+ // Count of idle threads.
+ size_t _numIdleThreads = 0;
+
+ // Id counter for assigning thread names
+ size_t _nextThreadId = 0;
+
+ // The last time that _pendingTasks.size() grew to be at least _threads.size().
+ Date_t _lastFullUtilizationDate;
+};
+
+} // namespace mongo
diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp
new file mode 100644
index 00000000000..74881f0508e
--- /dev/null
+++ b/src/mongo/util/concurrency/thread_pool_test.cpp
@@ -0,0 +1,284 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include <boost/optional.hpp>
+
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+#include "mongo/util/timer.h"
+
+namespace {
+using namespace mongo;
+
+class ThreadPoolTest : public unittest::Test {
+protected:
+ ThreadPool& makePool(ThreadPool::Options options) {
+ ASSERT(!_pool);
+ _pool.emplace(std::move(options));
+ return *_pool;
+ }
+
+ ThreadPool& pool() {
+ ASSERT(_pool);
+ return *_pool;
+ }
+
+ void blockingWork() {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ ++count1;
+ cv1.notify_all();
+ while (!flag2) {
+ cv2.wait(lk);
+ }
+ }
+
+ stdx::mutex mutex;
+ stdx::condition_variable cv1;
+ stdx::condition_variable cv2;
+ size_t count1 = 0U;
+ bool flag2 = false;
+
+private:
+ void tearDown() override {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ flag2 = true;
+ cv2.notify_all();
+ lk.unlock();
+ }
+
+ boost::optional<ThreadPool> _pool;
+};
+
+TEST(ThreadPoolTest, UnusedPool) {
+ ThreadPool pool((ThreadPool::Options()));
+}
+
+TEST(ThreadPoolTest, CannotScheduleAfterShutdown) {
+ ThreadPool pool((ThreadPool::Options()));
+ pool.shutdown();
+ ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {}));
+}
+
+TEST_F(ThreadPoolTest, MinPoolSize0) {
+ ThreadPool::Options options;
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ options.maxIdleThreadAge = Milliseconds(100);
+ auto& pool = makePool(options);
+ pool.startup();
+ ASSERT_EQ(0U, pool.getStats().numThreads);
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ ASSERT_OK(pool.schedule([this] { blockingWork(); }));
+ while (count1 != 1U) {
+ cv1.wait(lk);
+ }
+ auto stats = pool.getStats();
+ ASSERT_EQUALS(1U, stats.numThreads);
+ ASSERT_EQUALS(0U, stats.numPendingTasks);
+ ASSERT_OK(pool.schedule([] {}));
+ stats = pool.getStats();
+ ASSERT_EQUALS(1U, stats.numThreads);
+ ASSERT_EQUALS(0U, stats.numIdleThreads);
+ ASSERT_EQUALS(1U, stats.numPendingTasks);
+ flag2 = true;
+ cv2.notify_all();
+ lk.unlock();
+ Timer reapTimer;
+ for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) {
+ sleepmillis(100);
+ }
+ const Microseconds reapTime(reapTimer.micros());
+ ASSERT_EQ(options.minThreads, stats.numThreads)
+ << "Failed to reap all threads after " << durationCount<Milliseconds>(reapTime) << "ms";
+ lk.lock();
+ flag2 = false;
+ count1 = 0;
+ ASSERT_OK(pool.schedule([this] { blockingWork(); }));
+ while (count1 == 0) {
+ cv1.wait(lk);
+ }
+ stats = pool.getStats();
+ ASSERT_EQUALS(1U, stats.numThreads);
+ ASSERT_EQUALS(0U, stats.numIdleThreads);
+ ASSERT_EQUALS(0U, stats.numPendingTasks);
+ flag2 = true;
+ cv2.notify_all();
+ lk.unlock();
+}
+
+TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
+ ThreadPool::Options options;
+ options.minThreads = 15;
+ options.maxThreads = 20;
+ options.maxIdleThreadAge = Milliseconds(100);
+ auto& pool = makePool(options);
+ pool.startup();
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ for (size_t i = 0U; i < 30U; ++i) {
+ ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i;
+ }
+ while (count1 < 20U) {
+ cv1.wait(lk);
+ }
+ ASSERT_EQ(20U, count1);
+ auto stats = pool.getStats();
+ ASSERT_EQ(20U, stats.numThreads);
+ ASSERT_EQ(0U, stats.numIdleThreads);
+ ASSERT_EQ(10U, stats.numPendingTasks);
+ flag2 = true;
+ cv2.notify_all();
+ while (count1 < 30U) {
+ cv1.wait(lk);
+ }
+ lk.unlock();
+ stats = pool.getStats();
+ ASSERT_EQ(0U, stats.numPendingTasks);
+ Timer reapTimer;
+ for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) {
+ sleepmillis(50);
+ }
+ const Microseconds reapTime(reapTimer.micros());
+ ASSERT_EQ(options.minThreads, stats.numThreads)
+ << "Failed to reap excess threads after " << durationCount<Milliseconds>(reapTime) << "ms";
+}
+
+DEATH_TEST(ThreadPoolTest, MaxThreadsTooFewDies, "but the maximum must be at least 1") {
+ ThreadPool::Options options;
+ options.maxThreads = 0;
+ ThreadPool pool(options);
+}
+
+DEATH_TEST(ThreadPoolTest,
+ MinThreadsTooManyDies,
+ "6 which is more than the configured maximum of 5") {
+ ThreadPool::Options options;
+ options.maxThreads = 5;
+ options.minThreads = 6;
+ ThreadPool pool(options);
+}
+
+TEST(ThreadPoolTest, LivePoolCleanedByDestructor) {
+ ThreadPool pool((ThreadPool::Options()));
+ pool.startup();
+ while (pool.getStats().numThreads == 0) {
+ sleepmillis(50);
+ }
+ // Destructor should reap leftover threads.
+}
+
+TEST(ThreadPoolTest, PoolDestructorExecutesRemainingTasks) {
+ ThreadPool::Options options;
+ options.minThreads = options.maxThreads = 1;
+ ThreadPool pool(options);
+ ASSERT_OK(pool.schedule([] { return; }));
+}
+
+TEST(ThreadPoolTest, PoolJoinExecutesRemainingTasks) {
+ ThreadPool::Options options;
+ options.minThreads = options.maxThreads = 1;
+ ThreadPool pool(options);
+ ASSERT_OK(pool.schedule([] { return; }));
+ pool.shutdown();
+ pool.join();
+}
+
+DEATH_TEST(ThreadPoolTest, DieOnDoubleStartUp, "it has already started") {
+ ThreadPool pool((ThreadPool::Options()));
+ pool.startup();
+ pool.startup();
+}
+
+DEATH_TEST(ThreadPoolTest, DieWhenExceptionBubblesUp, "Exception escaped task in thread pool") {
+ ThreadPool pool((ThreadPool::Options()));
+ pool.startup();
+ ASSERT_OK(pool.schedule([] { uassertStatusOK(Status({ErrorCodes::BadValue, "No good"})); }));
+ pool.shutdown();
+ pool.join();
+}
+
+DEATH_TEST(ThreadPoolTest,
+ DieOnDoubleJoin,
+ "Attempted to join pool DoubleJoinPool more than once") {
+ ThreadPool::Options options;
+ options.poolName = "DoubleJoinPool";
+ ThreadPool pool(options);
+ pool.shutdown();
+ pool.join();
+ pool.join();
+}
+
+DEATH_TEST(ThreadPoolTest,
+ DestructionDuringJoinDies,
+ "Attempted to join pool DoubleJoinPool more than once") {
+ // This test is a little complicated. We need to ensure that the ThreadPool destructor runs
+ // while some thread is blocked running ThreadPool::join, to see that double-join is fatal in
+ // the pool destructor. To do this, we first wait for minThreads threads to have started. Then,
+ // we create and lock a mutex in the test thread, schedule a work item in the pool to lock that
+ // mutex, schedule an independent thread to call join, and wait for numIdleThreads to hit 0
+ // inside the test thread. When that happens, we know that the thread in the pool executing our
+ // mutex-lock is blocked waiting for the mutex, so the independent thread must be blocked inside
+ // of join(), until the pool thread finishes. At this point, if we destroy the pool, its
+ // destructor should trigger a fatal error due to double-join.
+ stdx::mutex mutex;
+ ThreadPool::Options options;
+ options.minThreads = 2;
+ options.poolName = "DoubleJoinPool";
+ boost::optional<ThreadPool> pool;
+ pool.emplace(options);
+ pool->startup();
+ while (pool->getStats().numThreads < 2U) {
+ sleepmillis(50);
+ }
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); }));
+ stdx::thread t([&pool] {
+ pool->shutdown();
+ pool->join();
+ });
+ ThreadPool::Stats stats;
+ while ((stats = pool->getStats()).numIdleThreads != 0U) {
+ sleepmillis(50);
+ }
+ ASSERT_EQ(1U, stats.numThreads);
+ ASSERT_EQ(0U, stats.numPendingTasks);
+ pool.reset();
+ lk.unlock();
+ t.join();
+}
+
+} // namespace