path: root/src/mongo/transport/service_executor_test.cpp
diff options
authorAmirsaman Memaripour <>2020-06-30 20:12:29 +0000
committerEvergreen Agent <>2020-07-13 18:43:43 +0000
commit5893ced59bd4f54626d92b72a6a776a2f2ec2ccc (patch)
tree92ea718a0267b0c228a2a2b1fd1a70eb7a6efa47 /src/mongo/transport/service_executor_test.cpp
parent5679ec1dc597df81cbf7e2bf153b5859489521c9 (diff)
SERVER-49105 Create fixed thread pool ServiceExecutor
Diffstat (limited to 'src/mongo/transport/service_executor_test.cpp')
1 files changed, 187 insertions, 0 deletions
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 4838789df47..99ee7f3ddb5 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -32,13 +32,21 @@
#include "mongo/platform/basic.h"
#include "boost/optional.hpp"
+#include <algorithm>
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/transport/service_executor_fixed.h"
+#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
#include "mongo/util/scopeguard.h"
#include <asio.hpp>
@@ -158,6 +166,185 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) {
scheduleBasicTask(executor.get(), false);
+class ServiceExecutorFixedFixture : public unittest::Test {
+ static constexpr auto kNumExecutorThreads = 2;
+ void setUp() override {
+ ThreadPool::Options options;
+ options.minThreads = options.maxThreads = kNumExecutorThreads;
+ options.poolName = "Test";
+ _executor = std::make_shared<ServiceExecutorFixed>(std::move(options));
+ }
+ void tearDown() override {
+ if (_skipShutdown)
+ return;
+ ASSERT_OK(_executor->shutdown(kShutdownTime));
+ }
+ void skipShutdown(bool skip) {
+ _skipShutdown = skip;
+ }
+ auto getServiceExecutor() const {
+ return _executor;
+ }
+ auto startAndGetServiceExecutor() {
+ ASSERT_OK(_executor->start());
+ return getServiceExecutor();
+ }
+ auto makeRecursionGuard() {
+ _recursionDepth.fetchAndAdd(1);
+ return makeGuard([this] { _recursionDepth.fetchAndSubtract(1); });
+ }
+ auto getRecursionDepth() const {
+ return _recursionDepth.load();
+ }
+ AtomicWord<int> _recursionDepth{0};
+ bool _skipShutdown = false;
+ std::shared_ptr<ServiceExecutorFixed> _executor;
+TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) {
+ auto executor = getServiceExecutor();
+ ASSERT_NOT_OK(executor->schedule([] {}, ServiceExecutor::kEmptyFlags));
+DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") {
+ startAndGetServiceExecutor();
+ skipShutdown(true);
+TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ ASSERT_OK(executor->schedule([barrier]() mutable { barrier->countDownAndWait(); },
+ ServiceExecutor::kEmptyFlags));
+ barrier->countDownAndWait();
+TEST_F(ServiceExecutorFixedFixture, RecursiveTask) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ ServiceExecutor::Task recursiveTask;
+ recursiveTask = [this, executor, barrier, &recursiveTask] {
+ auto recursionGuard = makeRecursionGuard();
+ if (getRecursionDepth() < fixedServiceExecutorRecursionLimit.load()) {
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse));
+ } else {
+ // This test never returns unless the service executor can satisfy the recursion depth.
+ barrier->countDownAndWait();
+ }
+ };
+ // Schedule recursive task and wait for the recursion to stop
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse));
+ barrier->countDownAndWait();
+TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ AtomicWord<int> tasksToSchedule{fixedServiceExecutorRecursionLimit.load() * 3};
+ // A recursive task that expects to be executed non-recursively. The task recursively schedules
+ // "tasksToSchedule" tasks to the service executor, and each scheduled task verifies that the
+ // recursion depth remains zero during its execution.
+ ServiceExecutor::Task recursiveTask;
+ recursiveTask = [this, executor, barrier, &recursiveTask, &tasksToSchedule] {
+ auto recursionGuard = makeRecursionGuard();
+ ASSERT_EQ(getRecursionDepth(), 1);
+ if (tasksToSchedule.fetchAndSubtract(1) > 0) {
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kEmptyFlags));
+ } else {
+ // Once there are no more tasks to schedule, notify the main thread to proceed.
+ barrier->countDownAndWait();
+ }
+ };
+ // Schedule the recursive task and wait for the execution to finish.
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule));
+ barrier->countDownAndWait();
+TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) {
+ auto executor = startAndGetServiceExecutor();
+ auto invoked = std::make_shared<SharedPromise<void>>();
+ auto mayReturn = std::make_shared<SharedPromise<void>>();
+ ASSERT_OK(executor->schedule(
+ [executor, invoked, mayReturn]() mutable {
+ invoked->emplaceValue();
+ mayReturn->getFuture().get();
+ },
+ ServiceExecutor::kEmptyFlags));
+ invoked->getFuture().get();
+ ASSERT_NOT_OK(executor->shutdown(kShutdownTime));
+ // Ensure the service executor is stopped before leaving the test.
+ mayReturn->emplaceValue();
+TEST_F(ServiceExecutorFixedFixture, Stats) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(kNumExecutorThreads + 1);
+ stdx::condition_variable cond;
+ auto task = [barrier, &cond]() mutable {
+ cond.notify_one();
+ barrier->countDownAndWait();
+ };
+ for (auto i = 0; i < kNumExecutorThreads; i++) {
+ ASSERT_OK(executor->schedule(task, ServiceExecutor::kEmptyFlags));
+ }
+ // The main thread waits for the executor threads to bump up "threadsRunning" while picking up a
+ // task to execute. Once all executor threads are running, the main thread will unblock them
+ // through the barrier.
+ auto mutex = MONGO_MAKE_LATCH();
+ stdx::unique_lock<Latch> lk(mutex);
+ cond.wait(lk, [&executor]() {
+ BSONObjBuilder bob;
+ executor->appendStats(&bob);
+ auto obj = bob.obj();
+ ASSERT(obj.hasField("threadsRunning"));
+ auto threadsRunning = obj.getIntField("threadsRunning");
+ 4910503, 1, "Checked number of executor threads", "threads"_attr = threadsRunning);
+ return threadsRunning == static_cast<int>(ServiceExecutorFixedFixture::kNumExecutorThreads);
+ });
+ barrier->countDownAndWait();
+TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
+ auto executor = startAndGetServiceExecutor();
+ std::unique_ptr<stdx::thread> schedulerThread;
+ {
+ // Spawn a thread to schedule a task, and block it before it can schedule the task with the
+ // underlying thread-pool. Then shutdown the service executor and unblock the scheduler
+ // thread. This order of events must cause "schedule()" to return a non-okay status.
+ FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask");
+ schedulerThread = std::make_unique<stdx::thread>([executor] {
+ executor->schedule([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags));
+ });
+ failpoint->waitForTimesEntered(1);
+ ASSERT_OK(executor->shutdown(kShutdownTime));
+ }
+ schedulerThread->join();
} // namespace
} // namespace mongo