summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency')
-rw-r--r--src/mongo/util/concurrency/thread_pool.cpp24
-rw-r--r--src/mongo/util/concurrency/thread_pool.h2
-rw-r--r--src/mongo/util/concurrency/thread_pool_interface.h7
-rw-r--r--src/mongo/util/concurrency/thread_pool_test.cpp27
-rw-r--r--src/mongo/util/concurrency/thread_pool_test_common.cpp23
5 files changed, 27 insertions, 56 deletions
diff --git a/src/mongo/util/concurrency/thread_pool.cpp b/src/mongo/util/concurrency/thread_pool.cpp
index 5f732c1a1ed..81f1e3c2285 100644
--- a/src/mongo/util/concurrency/thread_pool.cpp
+++ b/src/mongo/util/concurrency/thread_pool.cpp
@@ -185,22 +185,15 @@ void ThreadPool::_drainPendingTasks() {
cleanThread.join();
}
-void ThreadPool::schedule(Task task) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
+Status ThreadPool::schedule(Task task) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
switch (_state) {
case joinRequired:
case joining:
- case shutdownComplete: {
- auto status = Status(ErrorCodes::ShutdownInProgress,
- str::stream() << "Shutdown of thread pool " << _options.poolName
- << " in progress");
-
- lk.unlock();
- task(status);
- return;
- } break;
-
+ case shutdownComplete:
+ return Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Shutdown of thread pool " << _options.poolName
+ << " in progress");
case preStart:
case running:
break;
@@ -209,7 +202,7 @@ void ThreadPool::schedule(Task task) {
}
_pendingTasks.emplace_back(std::move(task));
if (_state == preStart) {
- return;
+ return Status::OK();
}
if (_numIdleThreads < _pendingTasks.size()) {
_startWorkerThread_inlock();
@@ -218,6 +211,7 @@ void ThreadPool::schedule(Task task) {
_lastFullUtilizationDate = Date_t::now();
}
_workAvailable.notify_one();
+ return Status::OK();
}
void ThreadPool::waitForIdle() {
@@ -338,7 +332,7 @@ void ThreadPool::_doOneTask(stdx::unique_lock<stdx::mutex>* lk) noexcept {
_pendingTasks.pop_front();
--_numIdleThreads;
lk->unlock();
- task(Status::OK());
+ task();
lk->lock();
++_numIdleThreads;
if (_pendingTasks.empty() && _threads.size() == _numIdleThreads) {
diff --git a/src/mongo/util/concurrency/thread_pool.h b/src/mongo/util/concurrency/thread_pool.h
index c4873f84dff..370de65056e 100644
--- a/src/mongo/util/concurrency/thread_pool.h
+++ b/src/mongo/util/concurrency/thread_pool.h
@@ -123,7 +123,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
- void schedule(Task task) override;
+ Status schedule(Task task) override;
/**
* Blocks the caller until there are no pending tasks on this pool.
diff --git a/src/mongo/util/concurrency/thread_pool_interface.h b/src/mongo/util/concurrency/thread_pool_interface.h
index bb59ac8e9f8..847dfe0506b 100644
--- a/src/mongo/util/concurrency/thread_pool_interface.h
+++ b/src/mongo/util/concurrency/thread_pool_interface.h
@@ -30,7 +30,6 @@
#pragma once
#include "mongo/util/functional.h"
-#include "mongo/util/out_of_line_executor.h"
namespace mongo {
@@ -39,11 +38,13 @@ class Status;
/**
* Interface for a thread pool.
*/
-class ThreadPoolInterface : public OutOfLineExecutor {
+class ThreadPoolInterface {
ThreadPoolInterface(const ThreadPoolInterface&) = delete;
ThreadPoolInterface& operator=(const ThreadPoolInterface&) = delete;
public:
+ using Task = unique_function<void()>;
+
/**
* Destroys a thread pool.
*
@@ -81,7 +82,7 @@ public:
* It is safe to call this before startup(), but the scheduled task will not execute
* until after startup() is called.
*/
- virtual void schedule(Task task) = 0;
+ virtual Status schedule(Task task) = 0;
protected:
ThreadPoolInterface() = default;
diff --git a/src/mongo/util/concurrency/thread_pool_test.cpp b/src/mongo/util/concurrency/thread_pool_test.cpp
index b4a650c54bb..012b5ca3ca7 100644
--- a/src/mongo/util/concurrency/thread_pool_test.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test.cpp
@@ -104,17 +104,14 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
pool.startup();
ASSERT_EQ(0U, pool.getStats().numThreads);
stdx::unique_lock<stdx::mutex> lk(mutex);
- pool.schedule([this](auto status) {
- ASSERT_OK(status);
- blockingWork();
- });
+ ASSERT_OK(pool.schedule([this] { blockingWork(); }));
while (count1 != 1U) {
cv1.wait(lk);
}
auto stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numPendingTasks);
- pool.schedule([](auto status) { ASSERT_OK(status); });
+ ASSERT_OK(pool.schedule([] {}));
stats = pool.getStats();
ASSERT_EQUALS(1U, stats.numThreads);
ASSERT_EQUALS(0U, stats.numIdleThreads);
@@ -132,10 +129,7 @@ TEST_F(ThreadPoolTest, MinPoolSize0) {
lk.lock();
flag2 = false;
count1 = 0;
- pool.schedule([this](auto status) {
- ASSERT_OK(status);
- blockingWork();
- });
+ ASSERT_OK(pool.schedule([this] { blockingWork(); }));
while (count1 == 0) {
cv1.wait(lk);
}
@@ -157,10 +151,7 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
pool.startup();
stdx::unique_lock<stdx::mutex> lk(mutex);
for (size_t i = 0U; i < 30U; ++i) {
- pool.schedule([this, i](auto status) {
- ASSERT_OK(status) << i;
- blockingWork();
- });
+ ASSERT_OK(pool.schedule([this] { blockingWork(); })) << i;
}
while (count1 < 20U) {
cv1.wait(lk);
@@ -234,10 +225,7 @@ DEATH_TEST(ThreadPoolTest,
sleepmillis(50);
}
stdx::unique_lock<stdx::mutex> lk(mutex);
- pool->schedule([&mutex](auto status) {
- ASSERT_OK(status);
- stdx::lock_guard<stdx::mutex> lk(mutex);
- });
+ ASSERT_OK(pool->schedule([&mutex] { stdx::lock_guard<stdx::mutex> lk(mutex); }));
stdx::thread t([&pool] {
pool->shutdown();
pool->join();
@@ -269,10 +257,7 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks)
ThreadPool pool(options);
pool.startup();
- pool.schedule([&barrier](auto status) {
- ASSERT_OK(status);
- barrier.countDownAndWait();
- });
+ ASSERT_OK(pool.schedule([&barrier] { barrier.countDownAndWait(); }));
barrier.countDownAndWait();
ASSERT_TRUE(onCreateThreadCalled);
diff --git a/src/mongo/util/concurrency/thread_pool_test_common.cpp b/src/mongo/util/concurrency/thread_pool_test_common.cpp
index 2c2113bb890..a03da59cd49 100644
--- a/src/mongo/util/concurrency/thread_pool_test_common.cpp
+++ b/src/mongo/util/concurrency/thread_pool_test_common.cpp
@@ -144,7 +144,7 @@ COMMON_THREAD_POOL_TEST(UnusedPool) {
COMMON_THREAD_POOL_TEST(CannotScheduleAfterShutdown) {
auto& pool = getThreadPool();
pool.shutdown();
- pool.schedule([](auto status) { ASSERT_EQ(status, ErrorCodes::ShutdownInProgress); });
+ ASSERT_EQ(ErrorCodes::ShutdownInProgress, pool.schedule([] {}));
}
COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleStartUp, "it has already started") {
@@ -160,9 +160,9 @@ constexpr auto kExceptionMessage = "No good very bad exception";
COMMON_THREAD_POOL_DEATH_TEST(DieWhenExceptionBubblesUp, kExceptionMessage) {
auto& pool = getThreadPool();
pool.startup();
- pool.schedule([](auto status) {
+ ASSERT_OK(pool.schedule([] {
uassertStatusOK(Status({ErrorCodes::BadValue, kExceptionMessage}));
- });
+ }));
pool.shutdown();
pool.join();
}
@@ -177,10 +177,7 @@ COMMON_THREAD_POOL_DEATH_TEST(DieOnDoubleJoin, "Attempted to join pool") {
COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
- pool.schedule([&executed](auto status) {
- ASSERT_OK(status);
- executed = true;
- });
+ ASSERT_OK(pool.schedule([&executed] { executed = true; }));
deleteThreadPool();
ASSERT_EQ(executed, true);
}
@@ -188,10 +185,7 @@ COMMON_THREAD_POOL_TEST(PoolDestructorExecutesRemainingTasks) {
COMMON_THREAD_POOL_TEST(PoolJoinExecutesRemainingTasks) {
auto& pool = getThreadPool();
bool executed = false;
- pool.schedule([&executed](auto status) {
- ASSERT_OK(status);
- executed = true;
- });
+ ASSERT_OK(pool.schedule([&executed] { executed = true; }));
pool.shutdown();
pool.join();
ASSERT_EQ(executed, true);
@@ -204,15 +198,12 @@ COMMON_THREAD_POOL_TEST(RepeatedScheduleDoesntSmashStack) {
std::size_t n = 0;
stdx::mutex mutex;
stdx::condition_variable condvar;
- func = [&pool, &n, &func, &condvar, &mutex, depth]() {
+ func = [&pool, &n, &func, &condvar, &mutex, depth] {
stdx::unique_lock<stdx::mutex> lk(mutex);
if (n < depth) {
n++;
lk.unlock();
- pool.schedule([&](auto status) {
- ASSERT_OK(status);
- func();
- });
+ ASSERT_OK(pool.schedule(func));
} else {
pool.shutdown();
condvar.notify_one();