diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2015-06-12 19:09:04 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2015-06-23 15:01:57 -0400 |
commit | 444b1859d1311fd5b775e1d4a0dba32c14d3c4f5 (patch) | |
tree | 5fc2a9e88457be20eab60055c7144dabd13d9ed9 | |
parent | b8f0f273d8f2db1aed1974b94c9b18a63656ad48 (diff) | |
download | mongo-444b1859d1311fd5b775e1d4a0dba32c14d3c4f5.tar.gz |
SERVER-19000 Implement a ThreadPool that dynamically adjusts the number of threads based on demand.
This pool is derived from the one built into executor::NetworkInterfaceImpl.
-rw-r--r-- | src/mongo/executor/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/util/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/util/concurrency/SConscript | 8 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.cpp | 373 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 266 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool_test.cpp | 283 |
6 files changed, 931 insertions, 3 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index b3d112d743f..09f1efacadb 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -17,6 +17,7 @@ env.Library(target='network_interface_impl', # TODO: rename to thread_pool_netwo source=['network_interface_impl.cpp',], LIBDEPS=[ '$BUILD_DIR/mongo/client/remote_command_runner_impl', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', 'network_interface', # TODO: add dependency on the task executor *interface* once available. ]) @@ -25,4 +26,4 @@ env.Library('network_interface_mock', 'network_interface_mock.cpp', LIBDEPS=[ 'network_interface', - ])
\ No newline at end of file + ]) diff --git a/src/mongo/util/SConscript b/src/mongo/util/SConscript index 08f66f615b0..ec2ac71ac8d 100644 --- a/src/mongo/util/SConscript +++ b/src/mongo/util/SConscript @@ -118,7 +118,6 @@ env.Library( '$BUILD_DIR/mongo/util/stacktrace', '$BUILD_DIR/mongo/util/concurrency/synchronization', '$BUILD_DIR/mongo/util/concurrency/thread_name', - '$BUILD_DIR/mongo/util/concurrency/thread_pool', '$BUILD_DIR/mongo/util/debugger', '$BUILD_DIR/third_party/shim_allocator', '$BUILD_DIR/third_party/shim_boost', diff --git a/src/mongo/util/concurrency/SConscript b/src/mongo/util/concurrency/SConscript index 505ff9fc98d..0dbc9c232d9 100644 --- a/src/mongo/util/concurrency/SConscript +++ b/src/mongo/util/concurrency/SConscript @@ -11,12 +11,18 @@ env.Library( target='thread_pool', source=[ 'old_thread_pool.cpp', + 'thread_pool.cpp', ], LIBDEPS=[ - '$BUILD_DIR/third_party/shim_boost', + '$BUILD_DIR/mongo/util/foundation', ], ) +env.CppUnitTest( + target='thread_pool_test', + source=['thread_pool_test.cpp'], + LIBDEPS=['thread_pool']) + env.Library('ticketholder', ['ticketholder.cpp'], LIBDEPS=['$BUILD_DIR/mongo/base/base', diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp new file mode 100644 index 00000000000..3de84bef6b3 --- /dev/null +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -0,0 +1,373 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor + +#include "mongo/platform/basic.h" + +#include "mongo/util/concurrency/thread_pool.h" + +#include "mongo/base/status.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/thread_name.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +namespace { + +// Counter used to assign unique names to otherwise-unnamed thread pools. +AtomicInt32 nextUnnamedThreadPoolId{1}; + +/** + * Sets defaults and checks bounds limits on "options", and returns it. + * + * This method is just a helper for the ThreadPool constructor. + */ +ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) { + if (options.poolName.empty()) { + options.poolName = str::stream() << "ThreadPool" << nextUnnamedThreadPoolId.fetchAndAdd(1); + } + if (options.threadNamePrefix.empty()) { + options.threadNamePrefix = str::stream() << options.poolName << '-'; + } + if (options.maxThreads < 1) { + severe() << "Tried to create pool " << options.poolName << " with a maximum of " << + options.maxThreads << " but the maximum must be at least 1"; + fassertFailed(28696); + } + if (options.minThreads > options.maxThreads) { + severe() << "Tried to create pool " << options.poolName << " with a minimum of " << + options.minThreads << " which is more than the configured maximum of " << + options.maxThreads; + fassertFailed(28686); + } + return options; +} + +} // namespace + +ThreadPool::ThreadPool(Options options) : _options(cleanUpOptions(std::move(options))) {} + +ThreadPool::~ThreadPool() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _shutdown_inlock(); + if (shutdownComplete != _state) { + _join_inlock(&lk); + } + + if (shutdownComplete != _state) { + severe() << "Failed to shutdown pool during destruction"; + fassertFailed(28695); + } + invariant(_threads.empty()); + invariant(_pendingTasks.empty()); +} + +void ThreadPool::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_state != preStart) { + severe() << "Attempting to start pool " << _options.poolName << + ", but it has already started"; + fassertFailed(28698); + } + _setState_inlock(running); + invariant(_threads.empty()); + const size_t numToStart = + std::min(_options.maxThreads, std::max(_options.minThreads, _pendingTasks.size())); + for (size_t i = 0; i < numToStart; ++i) { + _startWorkerThread_inlock(); + } +} + +void ThreadPool::shutdown() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _shutdown_inlock(); +} + +void ThreadPool::_shutdown_inlock() { + switch (_state) { + case preStart: + case running: + _setState_inlock(joinRequired); + _workAvailable.notify_all(); + return; + case joinRequired: + case joining: + case shutdownComplete: + return; + } +} + +void ThreadPool::join() { + try { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _join_inlock(&lk); + } + catch (...) { + std::terminate(); + } +} + +void ThreadPool::_join_inlock(stdx::unique_lock<stdx::mutex>* lk) { + _stateChange.wait(*lk, [this] { + switch (_state) { + case preStart: + return false; + case running: + return false; + case joinRequired: + return true; + case joining: + case shutdownComplete: + severe() << "Attempted to join pool " << _options.poolName << " more than once"; + fassertFailed(28700); + } + }); + _setState_inlock(joining); + ++_numIdleThreads; + while (!_pendingTasks.empty()) { + _doOneTask(lk); + } + --_numIdleThreads; + ThreadList threadsToJoin; + swap(threadsToJoin, _threads); + lk->unlock(); + for (auto& t : threadsToJoin) { + t.join(); + } + lk->lock(); + invariant(_state == joining); + _setState_inlock(shutdownComplete); +} + +Status ThreadPool::schedule(Task task) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + switch (_state) { + case joinRequired: + case joining: + case shutdownComplete: + return Status(ErrorCodes::ShutdownInProgress, str::stream() << + "Shutdown of thread pool " << _options.poolName << " in progress"); + case preStart: + case running: + break; + } + _pendingTasks.emplace_back(std::move(task)); + if (_state == preStart) { + return Status::OK(); + } + if (_numIdleThreads < _pendingTasks.size()) { + _startWorkerThread_inlock(); + } + if (_numIdleThreads <= _pendingTasks.size()) { + _lastFullUtilizationDate = Date_t::now(); + } + _workAvailable.notify_one(); + return Status::OK(); +} + +void ThreadPool::waitForIdle() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + // If there are any pending tasks, or non-idle threads, the pool is not idle. + while (!_pendingTasks.empty() || _numIdleThreads < _threads.size()) { + _poolIsIdle.wait(lk); + } +} + +ThreadPool::Stats ThreadPool::getStats() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + Stats result; + result.options = _options; + result.numThreads = _threads.size(); + result.numIdleThreads = _numIdleThreads; + result.numPendingTasks = _pendingTasks.size(); + result.lastFullUtilizationDate = _lastFullUtilizationDate; + return result; +} + +void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) { + std::string poolName = pool->_options.poolName; + setThreadName(threadName); + LOG(1) << "starting thread in pool " << poolName; + try { + pool->_consumeTasks(); + } + catch (...) { + severe() << "Exception reached top of stack in thread pool " << poolName; + std::terminate(); + } + + // At this point, another thread may have destroyed "pool", if this thread chose to detach + // itself and remove itself from pool->_threads before releasing pool->_mutex. Do not access + // member variables of "pool" from here, on. + // + // This can happen if this thread decided to retire, got descheduled after removing itself + // from _threads and calling detach(), and then the pool was deleted. When this thread resumes, + // it is no longer safe to access "pool". + LOG(1) << "shutting down thread in pool " << poolName; +} + +void ThreadPool::_consumeTasks() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_state == running) { + if (_pendingTasks.empty()) { + if (_threads.size() > _options.minThreads) { + // Since there are more than minThreads threads, this thread may be eligible for + // retirement. If it isn't now, it may be later, so it must put a time limit on how + // long it waits on _workAvailable. + const auto now = Date_t::now(); + const auto nextThreadRetirementDate = + _lastFullUtilizationDate + _options.maxIdleThreadAge; + if (now >= nextThreadRetirementDate) { + _lastFullUtilizationDate = now; + LOG(1) << "Reaping this thread; next thread reaped no earlier than " << + _lastFullUtilizationDate + _options.maxIdleThreadAge; + break; + } + + LOG(3) << "Not reaping because the earliest retirement date is " << + nextThreadRetirementDate; + _workAvailable.wait_until(lk, nextThreadRetirementDate); + } + else { + // Since the number of threads is not more than minThreads, this thread is not + // eligible for retirement. It is OK to sleep until _workAvailable is signaled, + // because any new threads that put the number of total threads above minThreads + // would be eligible for retirement once they had no work left to do. + LOG(3) << "waiting for work; I am one of " << _threads.size() << " thread(s);" << + " the minimum number of threads is " << _options.minThreads; + _workAvailable.wait(lk); + } + continue; + } + + _doOneTask(&lk); + } + + // We still hold the lock, but this thread is retiring. If the whole pool is shutting down, this + // thread lends a hand in draining the work pool and returns so it can be joined. Otherwise, it + // falls through to the detach code, below. + + if (_state == joinRequired || _state == joining) { + // Drain the leftover pending tasks. + while (!_pendingTasks.empty()) { + _doOneTask(&lk); + } + --_numIdleThreads; + return; + } + --_numIdleThreads; + + if (_state != running) { + severe() << "State of pool " << _options.poolName << " is " << static_cast<int32_t>(_state) + << ", but expected " << static_cast<int32_t>(running); + fassertFailedNoTrace(28693); + } + + // This thread is ending because it was idle for too long. Find self in _threads, remove self + // from _threads, detach self. + for (size_t i = 0; i < _threads.size(); ++i) { + auto& t = _threads[i]; + if (t.get_id() != stdx::this_thread::get_id()) { + continue; + } + t.detach(); + t.swap(_threads.back()); + _threads.pop_back(); + return; + } + severe().stream() << "Could not find this thread, with id " << + stdx::this_thread::get_id() << " in pool " << _options.poolName; + fassertFailedNoTrace(28697); +} + +void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) { + invariant(!_pendingTasks.empty()); + try { + LOG(3) << "Executing a task on behalf of pool " << _options.poolName; + Task task = std::move(_pendingTasks.front()); + _pendingTasks.pop_front(); + --_numIdleThreads; + lk->unlock(); + task(); + lk->lock(); + ++_numIdleThreads; + if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) { + _poolIsIdle.notify_all(); + } + } + catch (...) { + severe() << "Exception escaped task in thread pool " << _options.poolName; + std::terminate(); + } +} + +void ThreadPool::_startWorkerThread_inlock() { + switch (_state) { + case preStart: + LOG(1) << "Not starting new thread in pool " << _options.poolName << + ", yet; waiting for startup() call"; + return; + case joinRequired: + case joining: + case shutdownComplete: + LOG(1) << "Not starting new thread in pool " << _options.poolName << " while shutting down"; + return; + case running: + break; + } + if (_threads.size() == _options.maxThreads) { + LOG(2) << "Not starting new thread in pool " << _options.poolName << + " because it already has " << _options.maxThreads << ", its maximum"; + return; + } + invariant(_threads.size() < _options.maxThreads); + const std::string threadName = str::stream() << _options.threadNamePrefix << _nextThreadId++; + try { + _threads.emplace_back(stdx::bind(&ThreadPool::_workerThreadBody, this, threadName)); + ++_numIdleThreads; + } + catch (const std::exception& ex) { + error() << "Failed to start " << threadName << "; " << _threads.size() << + " other thread(s) still running in pool " << _options.poolName << + "; caught exception: " << ex.what(); + } +} + +void ThreadPool::_setState_inlock(const LifecycleState newState) { + if (newState == _state) { + return; + } + _state = newState; + _stateChange.notify_all(); +} + +} // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h new file mode 100644 index 00000000000..58d1fac945b --- /dev/null +++ b/src/mongo/util/concurrency/thread_pool.h @@ -0,0 +1,266 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/list.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class Status; + +/** + * A configurable thread pool, for general use. + * + * See the Options struct for information about how to configure an instance. + */ +class ThreadPool { + MONGO_DISALLOW_COPYING(ThreadPool); +public: + using Task = stdx::function<void ()>; + + /** + * Structure used to configure an instance of ThreadPool. + */ + struct Options { + // Name of the thread pool. If this string is empty, the pool will be assigned a + // name unique to the current process. + std::string poolName; + + // Prefix used to name threads for logging purposes. + // + // An integer will be appended to this string to create the thread name for each thread in + // the pool. Warning, if you create two pools and give them the same threadNamePrefix, you + // could have multiple threads that report the same name. If you leave this empty, the + // prefix will be the pool name followed by a hyphen. + std::string threadNamePrefix; + + // Minimum number of threads that must be in the pool. + // + // At least this many threads will be created at startup, and the pool will not reduce the + // total number of threads below this threshold before shutdown. + size_t minThreads = 1; + + // The pool will never grow to contain more than this many threads. + size_t maxThreads = 8; + + // If the pool has had at least one idle thread for this much time, it may consider reaping + // a thread. + Milliseconds maxIdleThreadAge = Seconds{30}; + }; + + /** + * Structure used to return information about the thread pool via getStats(). + */ + struct Stats { + // The options for the instance of the pool returning these stats. + Options options; + + // The number of threads currently in the pool, idle or active. + size_t numThreads; + + // The number of idle threads currently in the pool. + size_t numIdleThreads; + + // The number of tasks waiting to be executed by the pool. + size_t numPendingTasks; + + // The last time that no threads in the pool were idle. + Date_t lastFullUtilizationDate; + }; + + /** + * Constructs a thread pool, configured with the given "options". + */ + explicit ThreadPool(Options options); + + /** + * Destroys a thread pool. + * + * The destructor may block if join() has not previously been called and returned. + * It is fatal to destroy the pool while another thread is blocked on join(). + */ + ~ThreadPool(); + + /** + * Starts the thread pool. May be called at most once. + */ + void startup(); + + /** + * Signals the thread pool to shut down. Returns promptly. + * + * After this call, the thread will return an error for subsequent calls to schedule(). + * + * May be called by a task executing in the thread pool. Call join() after calling shutdown() + * to block until all tasks scheduled on the pool complete. + */ + void shutdown(); + + /** + * Blocks until the thread pool has fully shut down. Call at most once, and never from a task + * inside the pool. + */ + void join(); + + /** + * Schedules "task" to run in the thread pool. + * + * Returns OK on success, ShutdownInProgress if shutdown() has already executed. + * + * It is safe to call this before startup(), but the scheduled task will not execute + * until after startup() is called. + */ + Status schedule(Task task); + + /** + * Blocks the caller until there are no pending tasks on this pool. + * + * It is legal to call this whether or not shutdown has been called, but if it is called + * *before* shutdown() is called, there is no guarantee that there will still be no pending + * tasks when the function returns. + * + * May be called multiple times, by multiple threads. May not be called by a task in the thread + * pool. + */ + void waitForIdle(); + + /** + * Returns statistics about the thread pool's utilization. + */ + Stats getStats(); + +private: + using TaskList = stdx::list<Task>; + using ThreadList = std::vector<stdx::thread>; + + /** + * Representation of the stage of life of a thread pool. + * + * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work + * may only be scheduled in the preStart and running states. Threads may only be started in the + * running state. In shutdownComplete, there are no remaining threads or pending tasks to + * execute. + * + * Diagram of legal transitions: + * + * preStart -> running -> joinRequired -> joining -> shutdownComplete + * \ ^ + * \_____________/ + */ + enum LifecycleState { preStart, running, joinRequired, joining, shutdownComplete }; + + /** + * This is the thread body for worker threads. It is a static member function, + * because late in its execution it is possible for the pool to have been destroyed. + * As such, it is advisable to pass the pool pointer as an explicit argument, rather + * than as the implicit "this" argument. + */ + static void _workerThreadBody(ThreadPool* pool, const std::string& threadName); + + /** + * Starts a worker thread, unless _options.maxThreads threads are already running or + * _state is not running. + */ + void _startWorkerThread_inlock(); + + /** + * This is the run loop of a worker thread, invoked by _workerThreadBody. + */ + void _consumeTasks(); + + /** + * Implementation of shutdown once _mutex is locked. + */ + void _shutdown_inlock(); + + /** + * Implementation of join once _mutex is owned by "lk". + */ + void _join_inlock(stdx::unique_lock<stdx::mutex>* lk); + + /** + * Executes one task from _pendingTasks. "lk" must own _mutex, and _pendingTasks must have at + * least one entry. + */ + void _doOneTask(stdx::unique_lock<stdx::mutex>* lk); + + /** + * Changes the lifecycle state (_state) of the pool and wakes up any threads waiting for a state + * change. Has no effect if _state == newState. + */ + void _setState_inlock(LifecycleState newState); + + // These are the options with which the pool was configured at construction time. + const Options _options; + + // Mutex guarding all non-const member variables. + stdx::mutex _mutex; + + // This variable represents the lifecycle state of the pool. + // + // Work may only be scheduled in states preStart and running, and only executes in states + // running and shuttingDown. + LifecycleState _state = preStart; + + // Condition signaled to indicate that there is work in the _pendingTasks queue, or + // that the system is shutting down. + stdx::condition_variable _workAvailable; + + // Condition signaled to indicate that there is no work in the _pendingTasks queue. + stdx::condition_variable _poolIsIdle; + + // Condition variable signaled whenever _state changes. + stdx::condition_variable _stateChange; + + // Queue of yet-to-be-executed tasks. + TaskList _pendingTasks; + + // List of threads serving as the worker pool. + ThreadList _threads; + + // Count of idle threads. + size_t _numIdleThreads = 0; + + // Id counter for assigning thread names + size_t _nextThreadId = 0; + + // The last time that _pendingTasks.size() grew to be at least _threads.size(). + Date_t _lastFullUtilizationDate; +}; + +} // namespace mongo diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp new file mode 100644 index 00000000000..3eff7910613 --- /dev/null +++ b/src/mongo/util/concurrency/thread_pool_test.cpp @@ -0,0 +1,283 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include <boost/optional.hpp> + +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/log.h" +#include "mongo/util/time_support.h" +#include "mongo/util/timer.h" + +namespace { +using namespace mongo; + +class ThreadPoolTest : public unittest::Test { +protected: + ThreadPool& makePool(ThreadPool::Options options) { + ASSERT(!_pool); + _pool.emplace(std::move(options)); + return *_pool; + } + + ThreadPool& pool() { + ASSERT(_pool); + return *_pool; + } + + void blockingWork() { + stdx::unique_lock<stdx::mutex> lk(mutex); + ++count1; + cv1.notify_all(); + while (!flag2) { + cv2.wait(lk); + } + } + + stdx::mutex mutex; + stdx::condition_variable cv1; + stdx::condition_variable cv2; + size_t count1 = 0U; + bool flag2 = false; + +private: + void tearDown() override { + stdx::unique_lock<stdx::mutex> lk(mutex); + flag2 = true; + cv2.notify_all(); + lk.unlock(); + } + + boost::optional<ThreadPool> _pool; +}; + +TEST(ThreadPoolTest, UnusedPool) { + ThreadPool pool((ThreadPool::Options())); +} + +TEST(ThreadPoolTest, CannotScheduleAfterShutdown) { + ThreadPool pool((ThreadPool::Options())); + pool.shutdown(); + ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([]{})); +} + +TEST_F(ThreadPoolTest, MinPoolSize0) { + ThreadPool::Options options; + options.minThreads = 0; + options.maxThreads = 1; + options.maxIdleThreadAge = Milliseconds(100); + auto& pool = makePool(options); + pool.startup(); + ASSERT_EQ(0U, pool.getStats().numThreads); + stdx::unique_lock<stdx::mutex> lk(mutex); + ASSERT_OK(pool.schedule([this] { blockingWork(); })); + while (count1 != 1U) { + cv1.wait(lk); + } + auto stats = pool.getStats(); + ASSERT_EQUALS(1U, stats.numThreads); + ASSERT_EQUALS(0U, stats.numPendingTasks); + ASSERT_OK(pool.schedule([] {})); + stats = pool.getStats(); + ASSERT_EQUALS(1U, stats.numThreads); + ASSERT_EQUALS(0U, stats.numIdleThreads); + ASSERT_EQUALS(1U, stats.numPendingTasks); + flag2 = true; + cv2.notify_all(); + lk.unlock(); + Timer reapTimer; + for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) { + sleepmillis(100); + } + const Microseconds reapTime(reapTimer.micros()); + ASSERT_EQ(options.minThreads, stats.numThreads) << "Failed to reap all threads after " << + durationCount<Milliseconds>(reapTime) << "ms"; + lk.lock(); + flag2 = false; + count1 = 0; + ASSERT_OK(pool.schedule([this] { blockingWork(); })); + while (count1 == 0) { + cv1.wait(lk); + } + stats = pool.getStats(); + ASSERT_EQUALS(1U, stats.numThreads); + ASSERT_EQUALS(0U, stats.numIdleThreads); + ASSERT_EQUALS(0U, stats.numPendingTasks); + flag2 = true; + cv2.notify_all(); + lk.unlock(); +} + +TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) { + ThreadPool::Options options; + options.minThreads = 15; + options.maxThreads = 20; + options.maxIdleThreadAge = Milliseconds(100); + auto& pool = makePool(options); + pool.startup(); + stdx::unique_lock<stdx::mutex> lk(mutex); + for (size_t i = 0U; i < 30U; ++i) { + ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i; + } + while (count1 < 20U) { + cv1.wait(lk); + } + ASSERT_EQ(20U, count1); + auto stats = pool.getStats(); + ASSERT_EQ(20U, stats.numThreads); + ASSERT_EQ(0U, stats.numIdleThreads); + ASSERT_EQ(10U, stats.numPendingTasks); + flag2 = true; + cv2.notify_all(); + while (count1 < 30U) { + cv1.wait(lk); + } + lk.unlock(); + stats = pool.getStats(); + ASSERT_EQ(0U, stats.numPendingTasks); + Timer reapTimer; + for (size_t i = 0; i < 100 && (stats = pool.getStats()).numThreads > options.minThreads; ++i) { + sleepmillis(50); + } + const Microseconds reapTime(reapTimer.micros()); + ASSERT_EQ(options.minThreads, stats.numThreads) << "Failed to reap excess threads after " << + durationCount<Milliseconds>(reapTime) << "ms"; +} + +DEATH_TEST(ThreadPoolTest, MaxThreadsTooFewDies, "but the maximum must be at least 1") { + ThreadPool::Options options; + options.maxThreads = 0; + ThreadPool pool(options); +} + +DEATH_TEST(ThreadPoolTest, + MinThreadsTooManyDies, + "6 which is more than the configured maximum of 5") { + ThreadPool::Options options; + options.maxThreads = 5; + options.minThreads = 6; + ThreadPool pool(options); +} + +TEST(ThreadPoolTest, LivePoolCleanedByDestructor) { + ThreadPool pool((ThreadPool::Options())); + pool.startup(); + while (pool.getStats().numThreads == 0) { + sleepmillis(50); + } + // Destructor should reap leftover threads. +} + +TEST(ThreadPoolTest, PoolDestructorExecutesRemainingTasks) { + ThreadPool::Options options; + options.minThreads = options.maxThreads = 1; + ThreadPool pool(options); + ASSERT_OK(pool.schedule([] { return; })); +} + +TEST(ThreadPoolTest, PoolJoinExecutesRemainingTasks) { + ThreadPool::Options options; + options.minThreads = options.maxThreads = 1; + ThreadPool pool(options); + ASSERT_OK(pool.schedule([] { return; })); + pool.shutdown(); + pool.join(); +} + +DEATH_TEST(ThreadPoolTest, DieOnDoubleStartUp, "it has already started") { + ThreadPool pool((ThreadPool::Options())); + pool.startup(); + pool.startup(); +} + +DEATH_TEST(ThreadPoolTest, + DieWhenExceptionBubblesUp, + "Exception escaped task in thread pool") { + ThreadPool pool((ThreadPool::Options())); + pool.startup(); + ASSERT_OK(pool.schedule([] { uassertStatusOK(Status({ErrorCodes::BadValue, "No good"})); })); + pool.shutdown(); + pool.join(); +} + +DEATH_TEST(ThreadPoolTest, + DieOnDoubleJoin, + "Attempted to join pool DoubleJoinPool more than once") { + ThreadPool::Options options; + options.poolName = "DoubleJoinPool"; + ThreadPool pool(options); + pool.shutdown(); + pool.join(); + pool.join(); +} + +DEATH_TEST(ThreadPoolTest, + DestructionDuringJoinDies, + "Attempted to join pool DoubleJoinPool more than once") { + // This test is a little complicated. We need to ensure that the ThreadPool destructor runs + // while some thread is blocked running ThreadPool::join, to see that double-join is fatal in + // the pool destructor. To do this, we first wait for minThreads threads to have started. Then, + // we create and lock a mutex in the test thread, schedule a work item in the pool to lock that + // mutex, schedule an independent thread to call join, and wait for numIdleThreads to hit 0 + // inside the test thread. When that happens, we know that the thread in the pool executing our + // mutex-lock is blocked waiting for the mutex, so the independent thread must be blocked inside + // of join(), until the pool thread finishes. At this point, if we destroy the pool, its + // destructor should trigger a fatal error due to double-join. + stdx::mutex mutex; + ThreadPool::Options options; + options.minThreads = 2; + options.poolName = "DoubleJoinPool"; + boost::optional<ThreadPool> pool; + pool.emplace(options); + pool->startup(); + while (pool->getStats().numThreads < 2U) { + sleepmillis(50); + } + stdx::unique_lock<stdx::mutex> lk(mutex); + ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); })); + stdx::thread t([&pool] { pool->shutdown(); pool->join(); }); + ThreadPool::Stats stats; + while ((stats = pool->getStats()).numIdleThreads != 0U) { + sleepmillis(50); + } + ASSERT_EQ(1U, stats.numThreads); + ASSERT_EQ(0U, stats.numPendingTasks); + pool.reset(); + lk.unlock(); + t.join(); +} + +} // namespace |