diff options
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.cpp | 24 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool.h | 2 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool_interface.h | 7 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool_test.cpp | 27 | ||||
-rw-r--r-- | src/mongo/util/concurrency/thread_pool_test_common.cpp | 23 |
5 files changed, 27 insertions, 56 deletions
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp index 5f732c1a1ed..81f1e3c2285 100644 --- a/src/mongo/util/concurrency/thread_pool.cpp +++ b/src/mongo/util/concurrency/thread_pool.cpp @@ -185,22 +185,15 @@ void ThreadPool::_drainPendingTasks() { cleanThread.join(); } -void ThreadPool::schedule(Task task) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - +Status ThreadPool::schedule(Task task) { + stdx::lock_guard<stdx::mutex> lk(_mutex); switch (_state) { case joinRequired: case joining: - case shutdownComplete: { - auto status = Status(ErrorCodes::ShutdownInProgress, - str::stream() << "Shutdown of thread pool " << _options.poolName - << " in progress"); - - lk.unlock(); - task(status); - return; - } break; - + case shutdownComplete: + return Status(ErrorCodes::ShutdownInProgress, + str::stream() << "Shutdown of thread pool " << _options.poolName + << " in progress"); case preStart: case running: break; @@ -209,7 +202,7 @@ void ThreadPool::schedule(Task task) { } _pendingTasks.emplace_back(std::move(task)); if (_state == preStart) { - return; + return Status::OK(); } if (_numIdleThreads < _pendingTasks.size()) { _startWorkerThread_inlock(); @@ -218,6 +211,7 @@ void ThreadPool::schedule(Task task) { _lastFullUtilizationDate = Date_t::now(); } _workAvailable.notify_one(); + return Status::OK(); } void ThreadPool::waitForIdle() { @@ -338,7 +332,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) noexcept { _pendingTasks.pop_front(); --_numIdleThreads; lk->unlock(); - task(Status::OK()); + task(); lk->lock(); ++_numIdleThreads; if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) { diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h index c4873f84dff..370de65056e 100644 --- a/src/mongo/util/concurrency/thread_pool.h +++ b/src/mongo/util/concurrency/thread_pool.h @@ -123,7 +123,7 @@ public: void startup() override; void shutdown() override; void join() override; - void schedule(Task task) override; + Status schedule(Task task) override; /** * Blocks the caller until there are no pending tasks on this pool. diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h index bb59ac8e9f8..847dfe0506b 100644 --- a/src/mongo/util/concurrency/thread_pool_interface.h +++ b/src/mongo/util/concurrency/thread_pool_interface.h @@ -30,7 +30,6 @@ #pragma once #include "mongo/util/functional.h" -#include "mongo/util/out_of_line_executor.h" namespace mongo { @@ -39,11 +38,13 @@ class Status; /** * Interface for a thread pool. */ -class ThreadPoolInterface : public OutOfLineExecutor { +class ThreadPoolInterface { ThreadPoolInterface(const ThreadPoolInterface&) = delete; ThreadPoolInterface& operator=(const ThreadPoolInterface&) = delete; public: + using Task = unique_function<void()>; + /** * Destroys a thread pool. * @@ -81,7 +82,7 @@ public: * It is safe to call this before startup(), but the scheduled task will not execute * until after startup() is called. */ - virtual void schedule(Task task) = 0; + virtual Status schedule(Task task) = 0; protected: ThreadPoolInterface() = default; diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp index b4a650c54bb..012b5ca3ca7 100644 --- a/src/mongo/util/concurrency/thread_pool_test.cpp +++ b/src/mongo/util/concurrency/thread_pool_test.cpp @@ -104,17 +104,14 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { pool.startup(); ASSERT_EQ(0U, pool.getStats().numThreads); stdx::unique_lock<stdx::mutex> lk(mutex); - pool.schedule([this](auto status) { - ASSERT_OK(status); - blockingWork(); - }); + ASSERT_OK(pool.schedule([this] { blockingWork(); })); while (count1 != 1U) { cv1.wait(lk); } auto stats = pool.getStats(); ASSERT_EQUALS(1U, stats.numThreads); ASSERT_EQUALS(0U, stats.numPendingTasks); - pool.schedule([](auto status) { ASSERT_OK(status); }); + ASSERT_OK(pool.schedule([] {})); stats = pool.getStats(); ASSERT_EQUALS(1U, stats.numThreads); ASSERT_EQUALS(0U, stats.numIdleThreads); @@ -132,10 +129,7 @@ TEST_F(ThreadPoolTest, MinPoolSize0) { lk.lock(); flag2 = false; count1 = 0; - pool.schedule([this](auto status) { - ASSERT_OK(status); - blockingWork(); - }); + ASSERT_OK(pool.schedule([this] { blockingWork(); })); while (count1 == 0) { cv1.wait(lk); } @@ -157,10 +151,7 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) { pool.startup(); stdx::unique_lock<stdx::mutex> lk(mutex); for (size_t i = 0U; i < 30U; ++i) { - pool.schedule([this, i](auto status) { - ASSERT_OK(status) << i; - blockingWork(); - }); + ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i; } while (count1 < 20U) { cv1.wait(lk); @@ -234,10 +225,7 @@ DEATH_TEST(ThreadPoolTest, sleepmillis(50); } stdx::unique_lock<stdx::mutex> lk(mutex); - pool->schedule([&mutex](auto status) { - ASSERT_OK(status); - stdx::lock_guard<stdx::mutex> lk(mutex); - }); + ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); })); stdx::thread t([&pool] { pool->shutdown(); pool->join(); @@ -269,10 +257,7 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks) ThreadPool pool(options); pool.startup(); - pool.schedule([&barrier](auto status) { - ASSERT_OK(status); - barrier.countDownAndWait(); - }); + ASSERT_OK(pool.schedule([&barrier] { barrier.countDownAndWait(); })); barrier.countDownAndWait(); ASSERT_TRUE(onCreateThreadCalled); diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp index 2c2113bb890..a03da59cd49 100644 --- a/src/mongo/util/concurrency/thread_pool_test_common.cpp +++ b/src/mongo/util/concurrency/thread_pool_test_common.cpp @@ -144,7 +144,7 @@ COMMON_THREAD_POOL_TEST(UnusedPool) { COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) { auto& pool = getThreadPool(); pool.shutdown(); - pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); }); + ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {})); } COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "it has already started") { @@ -160,9 +160,9 @@ constexpr auto kExceptionMessage = "No good very bad exception"; COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) { auto& pool = getThreadPool(); pool.startup(); - pool.schedule([](auto status) { + ASSERT_OK(pool.schedule([] { uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage})); - }); + })); pool.shutdown(); pool.join(); } @@ -177,10 +177,7 @@ COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") { COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { auto& pool = getThreadPool(); bool executed = false; - pool.schedule([&executed](auto status) { - ASSERT_OK(status); - executed = true; - }); + ASSERT_OK(pool.schedule([&executed] { executed = true; })); deleteThreadPool(); ASSERT_EQ(executed, true); } @@ -188,10 +185,7 @@ COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) { COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) { auto& pool = getThreadPool(); bool executed = false; - pool.schedule([&executed](auto status) { - ASSERT_OK(status); - executed = true; - }); + ASSERT_OK(pool.schedule([&executed] { executed = true; })); pool.shutdown(); pool.join(); ASSERT_EQ(executed, true); @@ -204,15 +198,12 @@ COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) { std::size_t n = 0; stdx::mutex mutex; stdx::condition_variable condvar; - func = [&pool, &n, &func, &condvar, &mutex, depth]() { + func = [&pool, &n, &func, &condvar, &mutex, depth] { stdx::unique_lock<stdx::mutex> lk(mutex); if (n < depth) { n++; lk.unlock(); - pool.schedule([&](auto status) { - ASSERT_OK(status); - func(); - }); + ASSERT_OK(pool.schedule(func)); } else { pool.shutdown(); condvar.notify_one(); |