summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@10gen.com>2020-01-29 23:42:42 +0000
committerevergreen <evergreen@mongodb.com>2020-01-29 23:42:42 +0000
commit84dc6fcc49eb679c7fe7a5614f496c65e95576fb (patch)
tree625e889f904a9521f8cbdcbeaf77af11a8f6833c /src/mongo/util/concurrency
parentc1275b80192e67bc5676b8df5519a74709629f2e (diff)
downloadmongo-84dc6fcc49eb679c7fe7a5614f496c65e95576fb.tar.gz
SERVER-44800 Wait for retired threads to join in ThreadPool
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp25
-rw-r--r--src/mongo/util/concurrency/thread_pool.h22
-rw-r--r--src/mongo/util/concurrency/thread_pool_test.cpp32
3 files changed, 74 insertions, 5 deletions
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index ceaf9fcaf7e..0688293adfa 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -134,6 +134,15 @@ void ThreadPool::join() {
_join_inlock(&lk);
}
+void ThreadPool::_joinRetired_inlock() {
+ while (!_retiredThreads.empty()) {
+ auto& t = _retiredThreads.front();
+ t.join();
+ _options.onJoinRetiredThread(t);
+ _retiredThreads.pop_front();
+ }
+}
+
void ThreadPool::_join_inlock(stdx::unique_lock<Latch>* lk) {
_stateChange.wait(*lk, [this] {
switch (_state) {
@@ -158,6 +167,7 @@ void ThreadPool::_join_inlock(stdx::unique_lock<Latch>* lk) {
lk->lock();
}
--_numIdleThreads;
+ _joinRetired_inlock();
ThreadList threadsToJoin;
swap(threadsToJoin, _threads);
lk->unlock();
@@ -239,7 +249,7 @@ ThreadPool::Stats ThreadPool::getStats() const {
return result;
}
-void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) {
+void ThreadPool::_workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept {
setThreadName(threadName);
pool->_options.onCreateThread(threadName);
const auto poolName = pool->_options.poolName;
@@ -260,6 +270,13 @@ void ThreadPool::_consumeTasks() {
stdx::unique_lock<Latch> lk(_mutex);
while (_state == running) {
if (_pendingTasks.empty()) {
+ /**
+ * Help with garbage collecting retired threads to:
+ * * Reduce the memory overhead of _retiredThreads
+ * * Expedite the shutdown process
+ */
+ _joinRetired_inlock();
+
if (_threads.size() > _options.minThreads) {
// Since there are more than minThreads threads, this thread may be eligible for
// retirement. If it isn't now, it may be later, so it must put a time limit on how
@@ -315,14 +332,14 @@ void ThreadPool::_consumeTasks() {
}
// This thread is ending because it was idle for too long. Find self in _threads, remove self
- // from _threads, detach self.
+ // from _threads, and add self to the list of retired threads.
for (size_t i = 0; i < _threads.size(); ++i) {
auto& t = _threads[i];
if (t.get_id() != stdx::this_thread::get_id()) {
continue;
}
- t.detach();
- t.swap(_threads.back());
+ std::swap(t, _threads.back());
+ _retiredThreads.push_back(std::move(_threads.back()));
_threads.pop_back();
return;
}
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index c178f1d1a8d..336ff248a76 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -92,6 +92,14 @@ public:
// This function is run before each worker thread begins consuming tasks.
using OnCreateThreadFn = std::function<void(const std::string& threadName)>;
OnCreateThreadFn onCreateThread = [](const std::string&) {};
+
+ /**
+ * This function is called after joining each retired thread.
+ * Since there could be multiple calls to this function in a single critical section,
+ * avoid complex logic in the callback.
+ */
+ using OnJoinRetiredThreadFn = std::function<void(const stdx::thread&)>;
+ OnJoinRetiredThreadFn onJoinRetiredThread = [](const stdx::thread&) {};
};
/**
@@ -146,6 +154,7 @@ public:
private:
using TaskList = std::deque<Task>;
using ThreadList = std::vector<stdx::thread>;
+ using RetiredThreadList = std::list<stdx::thread>;
/**
* Representation of the stage of life of a thread pool.
@@ -169,7 +178,7 @@ private:
* As such, it is advisable to pass the pool pointer as an explicit argument, rather
* than as the implicit "this" argument.
*/
- static void _workerThreadBody(ThreadPool* pool, const std::string& threadName);
+ static void _workerThreadBody(ThreadPool* pool, const std::string& threadName) noexcept;
/**
* Starts a worker thread, unless _options.maxThreads threads are already running or
@@ -210,6 +219,14 @@ private:
*/
void _setState_inlock(LifecycleState newState);
+ /**
+ * Waits for all remaining retired threads to join.
+ * If a thread's _workerThreadBody() were ever to attempt to reacquire
+ * ThreadPool::_mutex after that thread had been added to _retiredThreads,
+ * it could cause a deadlock.
+ */
+ void _joinRetired_inlock();
+
// These are the options with which the pool was configured at construction time.
const Options _options;
@@ -238,6 +255,9 @@ private:
// List of threads serving as the worker pool.
ThreadList _threads;
+ // List of threads that are retired and pending join
+ RetiredThreadList _retiredThreads;
+
// Count of idle threads.
size_t _numIdleThreads = 0;
diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp
index 9545ce99cdb..fc246470bf1 100644
--- a/src/mongo/util/concurrency/thread_pool_test.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test.cpp
@@ -279,4 +279,36 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks)
ASSERT_EQUALS(options.threadNamePrefix + "0", taskThreadName);
}
+TEST(ThreadPoolTest, JoinAllRetiredThreads) {
+ AtomicWord<unsigned long> retiredThreads(0);
+ ThreadPool::Options options;
+ options.minThreads = 4;
+ options.maxThreads = 8;
+ options.maxIdleThreadAge = Milliseconds(100);
+ options.onJoinRetiredThread = [&](const stdx::thread& t) { retiredThreads.addAndFetch(1); };
+ unittest::Barrier barrier(options.maxThreads + 1);
+
+ ThreadPool pool(options);
+ for (auto i = options.maxThreads; i > 0; i--) {
+ pool.schedule([&](auto status) {
+ ASSERT_OK(status);
+ barrier.countDownAndWait();
+ });
+ }
+ ASSERT_EQ(pool.getStats().numThreads, 0);
+ pool.startup();
+ barrier.countDownAndWait();
+
+ while (pool.getStats().numThreads > options.minThreads) {
+ sleepmillis(100);
+ }
+
+ pool.shutdown();
+ pool.join();
+
+ const auto expectedRetiredThreads = options.maxThreads - options.minThreads;
+ ASSERT_EQ(retiredThreads.load(), expectedRetiredThreads);
+ ASSERT_EQ(pool.getStats().numIdleThreads, 0);
+}
+
} // namespace