From 84dc6fcc49eb679c7fe7a5614f496c65e95576fb Mon Sep 17 00:00:00 2001 From: Amirsaman Memaripour Date: Wed, 29 Jan 2020 23:42:42 +0000 Subject: SERVER-44800 Wait for retired threads to join in ThreadPool --- src/mongo/util/concurrency/thread_pool.cpp | 25 +++++++++++++++---- src/mongo/util/concurrency/thread_pool.h | 22 ++++++++++++++++- src/mongo/util/concurrency/thread_pool_test.cpp | 32 +++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 5 deletions(-) (limited to 'src/mongo/util/concurrency') diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index ceaf9fcaf7e..0688293adfa 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -134,6 +134,15 @@ void ThreadPool::join() { _join_inlock(&lk); } +void ThreadPool::_joinRetired_inlock() { + while (!_retiredThreads.empty()) { + auto& t = _retiredThreads.front(); + t.join(); + _options.onJoinRetiredThread(t); + _retiredThreads.pop_front(); + } +} + void ThreadPool::_join_inlock(stdx::unique_lock* lk) { _stateChange.wait(*lk, [this] { switch (_state) { @@ -158,6 +167,7 @@ void ThreadPool::_join_inlock(stdx::unique_lock* lk) { lk->lock(); } --_numIdleThreads; + _joinRetired_inlock(); ThreadList threadsToJoin; swap(threadsToJoin, _threads); lk->unlock(); @@ -239,7 +249,7 @@ ThreadPool::Stats ThreadPool::getStats() const { return result; } -void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) { +void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept { setThreadName(threadName); pool->_options.onCreateThread(threadName); const auto poolName = pool->_options.poolName; @@ -260,6 +270,13 @@ void ThreadPool::_consumeTasks() { stdx::unique_lock lk(_mutex); while (_state == running) { if (_pendingTasks.empty()) { + /** + * Help with garbage collecting retired threads to: + * * Reduce the memory overhead of _retiredThreads + * * Expedite the shutdown process + */ + _joinRetired_inlock(); + if (_threads.size() > _options.minThreads) { // Since there are more than minThreads threads, this thread may be eligible for // retirement. If it isn't now, it may be later, so it must put a time limit on how @@ -315,14 +332,14 @@ void ThreadPool::_consumeTasks() { } // This thread is ending because it was idle for too long. Find self in _threads, remove self - // from _threads, detach self. + // from _threads, and add self to the list of retired threads. for (size_t i = 0; i < _threads.size(); ++i) { auto& t = _threads[i]; if (t.get_id() != stdx::this_thread::get_id()) { continue; } - t.detach(); - t.swap(_threads.back()); + std::swap(t, _threads.back()); + _retiredThreads.push_back(std::move(_threads.back())); _threads.pop_back(); return; } diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index c178f1d1a8d..336ff248a76 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -92,6 +92,14 @@ public: // This function is run before each worker thread begins consuming tasks. using OnCreateThreadFn = std::function; OnCreateThreadFn onCreateThread = [](const std::string&) {}; + + /** + * This function is called after joining each retired thread. + * Since there could be multiple calls to this function in a single critical section, + * avoid complex logic in the callback. + */ + using OnJoinRetiredThreadFn = std::function; + OnJoinRetiredThreadFn onJoinRetiredThread = [](const stdx::thread&) {}; }; /** @@ -146,6 +154,7 @@ public: private: using TaskList = std::deque; using ThreadList = std::vector; + using RetiredThreadList = std::list; /** * Representation of the stage of life of a thread pool. @@ -169,7 +178,7 @@ private: * As such, it is advisable to pass the pool pointer as an explicit argument, rather * than as the implicit "this" argument. */ - static void _workerThreadBody(ThreadPool* pool, const std::string& threadName); + static void _workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept; /** * Starts a worker thread, unless _options.maxThreads threads are already running or @@ -210,6 +219,14 @@ private: */ void _setState_inlock(LifecycleState newState); + /** + * Waits for all remaining retired threads to join. + * If a thread's _workerThreadBody() were ever to attempt to reacquire + * ThreadPool::_mutex after that thread had been added to _retiredThreads, + * it could cause a deadlock. + */ + void _joinRetired_inlock(); + // These are the options with which the pool was configured at construction time. const Options _options; @@ -238,6 +255,9 @@ private: // List of threads serving as the worker pool. ThreadList _threads; + // List of threads that are retired and pending join + RetiredThreadList _retiredThreads; + // Count of idle threads. size_t _numIdleThreads = 0; diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp index 9545ce99cdb..fc246470bf1 100644 --- a/src/mongo/util/concurrency/thread_pool_test.cpp +++ b/src/mongo/util/concurrency/thread_pool_test.cpp @@ -279,4 +279,36 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) ASSERT_EQUALS(options.threadNamePrefix + "0", taskThreadName); } +TEST(ThreadPoolTest, JoinAllRetiredThreads) { + AtomicWord retiredThreads(0); + ThreadPool::Options options; + options.minThreads = 4; + options.maxThreads = 8; + options.maxIdleThreadAge = Milliseconds(100); + options.onJoinRetiredThread = [&](const stdx::thread& t) { retiredThreads.addAndFetch(1); }; + unittest::Barrier barrier(options.maxThreads + 1); + + ThreadPool pool(options); + for (auto i = options.maxThreads; i > 0; i--) { + pool.schedule([&](auto status) { + ASSERT_OK(status); + barrier.countDownAndWait(); + }); + } + ASSERT_EQ(pool.getStats().numThreads, 0); + pool.startup(); + barrier.countDownAndWait(); + + while (pool.getStats().numThreads > options.minThreads) { + sleepmillis(100); + } + + pool.shutdown(); + pool.join(); + + const auto expectedRetiredThreads = options.maxThreads - options.minThreads; + ASSERT_EQ(retiredThreads.load(), expectedRetiredThreads); + ASSERT_EQ(pool.getStats().numIdleThreads, 0); +} + } // namespace -- cgit v1.2.1