summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@mongodb.com>2020-06-30 20:12:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-13 18:43:43 +0000
commit5893ced59bd4f54626d92b72a6a776a2f2ec2ccc (patch)
tree92ea718a0267b0c228a2a2b1fd1a70eb7a6efa47
parent5679ec1dc597df81cbf7e2bf153b5859489521c9 (diff)
downloadmongo-5893ced59bd4f54626d92b72a6a776a2f2ec2ccc.tar.gz
SERVER-49105 Create fixed thread pool ServiceExecutor
-rw-r--r--src/mongo/transport/SConscript2
-rw-r--r--src/mongo/transport/service_executor.idl52
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp158
-rw-r--r--src/mongo/transport/service_executor_fixed.h128
-rw-r--r--src/mongo/transport/service_executor_test.cpp187
5 files changed, 478 insertions, 49 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index b5431d1db2a..813401c3c94 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -86,6 +86,7 @@ env.Library(
tlEnv.Library(
target='service_executor',
source=[
+ 'service_executor_fixed.cpp',
'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
env.Idlc('service_executor.idl')[0],
@@ -95,6 +96,7 @@ tlEnv.Library(
],
LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/idl/server_parameter",
+ "$BUILD_DIR/mongo/util/concurrency/thread_pool",
"$BUILD_DIR/mongo/util/processinfo",
'$BUILD_DIR/third_party/shim_asio',
'transport_layer_common',
diff --git a/src/mongo/transport/service_executor.idl b/src/mongo/transport/service_executor.idl
index 6e9e897bf40..d380492ec62 100644
--- a/src/mongo/transport/service_executor.idl
+++ b/src/mongo/transport/service_executor.idl
@@ -37,59 +37,13 @@ server_parameters:
cpp_vartype: "AtomicWord<int>"
cpp_varname: "synchronousServiceExecutorRecursionLimit"
default: 8
- adaptiveServiceExecutorReservedThreads:
- description: >-
- The executor will always keep this many threads around.
- If the value is -1, then it will be set to number of cores / 2.
- set_at: [ startup, runtime ]
- cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorReservedThreads"
- default: -1
- adaptiveServiceExecutorRunTimeMillis:
- description: >-
- Each worker thread will allow ASIO to run for this many milliseconds
- before checking whether it should exit.
- set_at: [ startup, runtime ]
- cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorRunTimeMillis"
- default: 5000
- adaptiveServiceExecutorRunTimeJitterMillis:
- description: >-
- The period that a worker thread runs will be offset by at most this value
- so that not all threads are scheduled at the same time.
- set_at: [ startup, runtime ]
- cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorRunTimeJitterMillis"
- default: 500
- adaptiveServiceExecutorStuckThreadTimeoutMillis:
- description: >-
- The maximum amount of time the controller thread will sleep
- before doing any stuck-thread detection.
- set_at: [ startup, runtime ]
- cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorStuckThreadTimeoutMillis"
- default: 250
- adaptiveServiceExecutorMaxQueueLatencyMicros:
- description: >-
- The maximum allowed latency between when a task is scheduled
- and a thread is started to service it.
- set_at: [ startup, runtime ]
- cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorMaxQueueLatencyMicros"
- default: 500
- adaptiveServiceExecutorIdlePctThreshold:
- description: >-
- The percent boundary of work time over run time below which worker threads will exit.
- set_at: [ startup, runtime ]
- cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorIdlePctThreshold"
- default: 60
- adaptiveServiceExecutorRecursionLimit:
+
+ fixedServiceExecutorRecursionLimit:
description: >-
Tasks may recurse further if their recursion depth is less than this value.
set_at: [ startup, runtime ]
cpp_vartype: "AtomicWord<int>"
- cpp_varname: "adaptiveServiceExecutorRecursionLimit"
+ cpp_varname: "fixedServiceExecutorRecursionLimit"
default: 8
reservedServiceExecutorRecursionLimit:
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
new file mode 100644
index 00000000000..48a94cd57cd
--- /dev/null
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
+
+#include "mongo/transport/service_executor_fixed.h"
+
+#include "mongo/base/error_codes.h"
+#include "mongo/logv2/log.h"
+#include "mongo/transport/service_executor_gen.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeSchedulingServiceExecutorFixedTask);
+
+namespace transport {
+namespace {
+constexpr auto kThreadsRunning = "threadsRunning"_sd;
+constexpr auto kExecutorLabel = "executor"_sd;
+constexpr auto kExecutorName = "fixed"_sd;
+} // namespace
+
+ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options)
+ : _options(std::move(options)) {
+ _options.onCreateThread =
+ [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable {
+ _executorContext = std::make_unique<ExecutorThreadContext>(this->weak_from_this());
+ onCreate(name);
+ };
+ _threadPool = std::make_unique<ThreadPool>(_options);
+}
+
+ServiceExecutorFixed::~ServiceExecutorFixed() {
+ invariant(!_canScheduleWork.load());
+ if (_state == State::kNotStarted)
+ return;
+
+ // Ensures we always call "shutdown" after staring the service executor
+ invariant(_state == State::kStopped);
+ _threadPool->shutdown();
+ _threadPool->join();
+ invariant(_numRunningExecutorThreads.load() == 0);
+}
+
+Status ServiceExecutorFixed::start() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ auto oldState = std::exchange(_state, State::kRunning);
+ invariant(oldState == State::kNotStarted);
+ _threadPool->startup();
+ _canScheduleWork.store(true);
+ LOGV2_DEBUG(
+ 4910501, 3, "Started fixed thread-pool service executor", "name"_attr = _options.poolName);
+ return Status::OK();
+}
+
+Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
+ auto waitForShutdown = [&]() mutable -> Status {
+ stdx::unique_lock<Latch> lk(_mutex);
+ bool success = _shutdownCondition.wait_for(lk, timeout.toSystemDuration(), [this] {
+ return _numRunningExecutorThreads.load() == 0;
+ });
+ return success ? Status::OK()
+ : Status(ErrorCodes::ExceededTimeLimit,
+ "Failed to shutdown all executor threads within the time limit");
+ };
+
+ LOGV2_DEBUG(4910502,
+ 3,
+ "Shutting down fixed thread-pool service executor",
+ "name"_attr = _options.poolName);
+
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _canScheduleWork.store(false);
+
+ auto oldState = std::exchange(_state, State::kStopped);
+ if (oldState != State::kStopped) {
+ _threadPool->shutdown();
+ }
+ }
+
+ return waitForShutdown();
+}
+
+Status ServiceExecutorFixed::schedule(Task task, ScheduleFlags flags) {
+ if (!_canScheduleWork.load()) {
+ return Status(ErrorCodes::ShutdownInProgress, "Executor is not running");
+ }
+
+ auto mayExecuteTaskInline = [&] {
+ if (!(flags & ScheduleFlags::kMayRecurse))
+ return false;
+ if (!_executorContext)
+ return false;
+ return true;
+ };
+
+
+ if (mayExecuteTaskInline()) {
+ invariant(_executorContext);
+ if (_executorContext->getRecursionDepth() <
+ fixedServiceExecutorRecursionLimit.loadRelaxed()) {
+ // Recursively executing the task on the executor thread.
+ _executorContext->run(task);
+ return Status::OK();
+ }
+ }
+
+ hangBeforeSchedulingServiceExecutorFixedTask.pauseWhileSet();
+
+ // May throw if an attempt is made to schedule after the thread pool is shutdown.
+ try {
+ _threadPool->schedule([task = std::move(task)](Status status) {
+ internalAssert(status);
+ invariant(_executorContext);
+ _executorContext->run(task);
+ });
+ } catch (DBException& e) {
+ return e.toStatus();
+ }
+
+ return Status::OK();
+}
+
+void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const {
+ *bob << kExecutorLabel << kExecutorName << kThreadsRunning
+ << static_cast<int>(_numRunningExecutorThreads.load());
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
new file mode 100644
index 00000000000..c418296db8b
--- /dev/null
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/base/status.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_executor.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/hierarchical_acquisition.h"
+
+namespace mongo {
+namespace transport {
+
+/**
+ * A service executor that uses a fixed (configurable) number of threads to execute tasks.
+ * This executor always yields before executing scheduled tasks, and never yields before scheduling
+ * new tasks (i.e., `ScheduleFlags::kMayYieldBeforeSchedule` is a no-op for this executor).
+ */
+class ServiceExecutorFixed : public ServiceExecutor,
+ public std::enable_shared_from_this<ServiceExecutorFixed> {
+public:
+ explicit ServiceExecutorFixed(ThreadPool::Options options);
+ virtual ~ServiceExecutorFixed();
+
+ Status start() override;
+ Status shutdown(Milliseconds timeout) override;
+ Status schedule(Task task, ScheduleFlags flags) override;
+
+ Mode transportMode() const override {
+ return Mode::kSynchronous;
+ }
+
+ void appendStats(BSONObjBuilder* bob) const override;
+
+private:
+ // Maintains the execution state (e.g., recursion depth) for executor threads
+ class ExecutorThreadContext {
+ public:
+ ExecutorThreadContext(std::weak_ptr<ServiceExecutorFixed> serviceExecutor)
+ : _executor(std::move(serviceExecutor)) {
+ _adjustRunningExecutorThreads(1);
+ }
+
+ ExecutorThreadContext(ExecutorThreadContext&&) = delete;
+ ExecutorThreadContext(const ExecutorThreadContext&) = delete;
+
+ ~ExecutorThreadContext() {
+ _adjustRunningExecutorThreads(-1);
+ }
+
+ void run(ServiceExecutor::Task task) {
+ // Yield here to improve concurrency, especially when there are more executor threads
+ // than CPU cores.
+ stdx::this_thread::yield();
+ _recursionDepth++;
+ task();
+ _recursionDepth--;
+ }
+
+ int getRecursionDepth() const {
+ return _recursionDepth;
+ }
+
+ private:
+ void _adjustRunningExecutorThreads(int adjustment) {
+ if (auto executor = _executor.lock()) {
+ executor->_numRunningExecutorThreads.fetchAndAdd(adjustment);
+ }
+ }
+
+ int _recursionDepth = 0;
+ std::weak_ptr<ServiceExecutorFixed> _executor;
+ };
+
+private:
+ AtomicWord<size_t> _numRunningExecutorThreads{0};
+ AtomicWord<bool> _canScheduleWork{false};
+
+ mutable Mutex _mutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ServiceExecutorFixed::_mutex");
+ stdx::condition_variable _shutdownCondition;
+
+ /**
+ * State transition diagram: kNotStarted ---> kRunning ---> kStopped
+ * The service executor cannot be in "kRunning" when its destructor is invoked.
+ */
+ enum State { kNotStarted, kRunning, kStopped } _state = kNotStarted;
+
+ ThreadPool::Options _options;
+ std::unique_ptr<ThreadPool> _threadPool;
+
+ static inline thread_local std::unique_ptr<ExecutorThreadContext> _executorContext;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 4838789df47..99ee7f3ddb5 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -32,13 +32,21 @@
#include "mongo/platform/basic.h"
#include "boost/optional.hpp"
+#include <algorithm>
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/transport/service_executor_fixed.h"
+#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/unittest/barrier.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/future.h"
#include "mongo/util/scopeguard.h"
#include <asio.hpp>
@@ -158,6 +166,185 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) {
scheduleBasicTask(executor.get(), false);
}
+class ServiceExecutorFixedFixture : public unittest::Test {
+public:
+ static constexpr auto kNumExecutorThreads = 2;
+
+ void setUp() override {
+ ThreadPool::Options options;
+ options.minThreads = options.maxThreads = kNumExecutorThreads;
+ options.poolName = "Test";
+ _executor = std::make_shared<ServiceExecutorFixed>(std::move(options));
+ }
+
+ void tearDown() override {
+ if (_skipShutdown)
+ return;
+ ASSERT_OK(_executor->shutdown(kShutdownTime));
+ }
+
+ void skipShutdown(bool skip) {
+ _skipShutdown = skip;
+ }
+
+ auto getServiceExecutor() const {
+ return _executor;
+ }
+
+ auto startAndGetServiceExecutor() {
+ ASSERT_OK(_executor->start());
+ return getServiceExecutor();
+ }
+
+ auto makeRecursionGuard() {
+ _recursionDepth.fetchAndAdd(1);
+ return makeGuard([this] { _recursionDepth.fetchAndSubtract(1); });
+ }
+
+ auto getRecursionDepth() const {
+ return _recursionDepth.load();
+ }
+
+private:
+ AtomicWord<int> _recursionDepth{0};
+ bool _skipShutdown = false;
+ std::shared_ptr<ServiceExecutorFixed> _executor;
+};
+
+TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) {
+ auto executor = getServiceExecutor();
+ ASSERT_NOT_OK(executor->schedule([] {}, ServiceExecutor::kEmptyFlags));
+}
+
+DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") {
+ startAndGetServiceExecutor();
+ skipShutdown(true);
+}
+
+TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ ASSERT_OK(executor->schedule([barrier]() mutable { barrier->countDownAndWait(); },
+ ServiceExecutor::kEmptyFlags));
+ barrier->countDownAndWait();
+}
+
+TEST_F(ServiceExecutorFixedFixture, RecursiveTask) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+
+ ServiceExecutor::Task recursiveTask;
+ recursiveTask = [this, executor, barrier, &recursiveTask] {
+ auto recursionGuard = makeRecursionGuard();
+ if (getRecursionDepth() < fixedServiceExecutorRecursionLimit.load()) {
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse));
+ } else {
+ // This test never returns unless the service executor can satisfy the recursion depth.
+ barrier->countDownAndWait();
+ }
+ };
+
+ // Schedule recursive task and wait for the recursion to stop
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse));
+ barrier->countDownAndWait();
+}
+
+TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(2);
+ AtomicWord<int> tasksToSchedule{fixedServiceExecutorRecursionLimit.load() * 3};
+
+ // A recursive task that expects to be executed non-recursively. The task recursively schedules
+ // "tasksToSchedule" tasks to the service executor, and each scheduled task verifies that the
+ // recursion depth remains zero during its execution.
+ ServiceExecutor::Task recursiveTask;
+ recursiveTask = [this, executor, barrier, &recursiveTask, &tasksToSchedule] {
+ auto recursionGuard = makeRecursionGuard();
+ ASSERT_EQ(getRecursionDepth(), 1);
+ if (tasksToSchedule.fetchAndSubtract(1) > 0) {
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kEmptyFlags));
+ } else {
+ // Once there are no more tasks to schedule, notify the main thread to proceed.
+ barrier->countDownAndWait();
+ }
+ };
+
+ // Schedule the recursive task and wait for the execution to finish.
+ ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule));
+ barrier->countDownAndWait();
+}
+
+TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) {
+ auto executor = startAndGetServiceExecutor();
+ auto invoked = std::make_shared<SharedPromise<void>>();
+ auto mayReturn = std::make_shared<SharedPromise<void>>();
+
+ ASSERT_OK(executor->schedule(
+ [executor, invoked, mayReturn]() mutable {
+ invoked->emplaceValue();
+ mayReturn->getFuture().get();
+ },
+ ServiceExecutor::kEmptyFlags));
+
+ invoked->getFuture().get();
+ ASSERT_NOT_OK(executor->shutdown(kShutdownTime));
+
+ // Ensure the service executor is stopped before leaving the test.
+ mayReturn->emplaceValue();
+}
+
+TEST_F(ServiceExecutorFixedFixture, Stats) {
+ auto executor = startAndGetServiceExecutor();
+ auto barrier = std::make_shared<unittest::Barrier>(kNumExecutorThreads + 1);
+ stdx::condition_variable cond;
+
+ auto task = [barrier, &cond]() mutable {
+ cond.notify_one();
+ barrier->countDownAndWait();
+ };
+
+ for (auto i = 0; i < kNumExecutorThreads; i++) {
+ ASSERT_OK(executor->schedule(task, ServiceExecutor::kEmptyFlags));
+ }
+
+ // The main thread waits for the executor threads to bump up "threadsRunning" while picking up a
+ // task to execute. Once all executor threads are running, the main thread will unblock them
+ // through the barrier.
+ auto mutex = MONGO_MAKE_LATCH();
+ stdx::unique_lock<Latch> lk(mutex);
+ cond.wait(lk, [&executor]() {
+ BSONObjBuilder bob;
+ executor->appendStats(&bob);
+ auto obj = bob.obj();
+ ASSERT(obj.hasField("threadsRunning"));
+ auto threadsRunning = obj.getIntField("threadsRunning");
+ LOGV2_DEBUG(
+ 4910503, 1, "Checked number of executor threads", "threads"_attr = threadsRunning);
+ return threadsRunning == static_cast<int>(ServiceExecutorFixedFixture::kNumExecutorThreads);
+ });
+
+ barrier->countDownAndWait();
+}
+
+TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) {
+ auto executor = startAndGetServiceExecutor();
+ std::unique_ptr<stdx::thread> schedulerThread;
+
+ {
+ // Spawn a thread to schedule a task, and block it before it can schedule the task with the
+ // underlying thread-pool. Then shutdown the service executor and unblock the scheduler
+ // thread. This order of events must cause "schedule()" to return a non-okay status.
+ FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask");
+ schedulerThread = std::make_unique<stdx::thread>([executor] {
+ ASSERT_NOT_OK(
+ executor->schedule([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags));
+ });
+ failpoint->waitForTimesEntered(1);
+ ASSERT_OK(executor->shutdown(kShutdownTime));
+ }
+
+ schedulerThread->join();
+}
} // namespace
} // namespace mongo