diff options
author | Amirsaman Memaripour <amirsaman.memaripour@mongodb.com> | 2020-06-30 20:12:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-07-13 18:43:43 +0000 |
commit | 5893ced59bd4f54626d92b72a6a776a2f2ec2ccc (patch) | |
tree | 92ea718a0267b0c228a2a2b1fd1a70eb7a6efa47 /src/mongo | |
parent | 5679ec1dc597df81cbf7e2bf153b5859489521c9 (diff) | |
download | mongo-5893ced59bd4f54626d92b72a6a776a2f2ec2ccc.tar.gz |
SERVER-49105 Create fixed thread pool ServiceExecutor
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/transport/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.idl | 52 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 158 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 128 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_test.cpp | 187 |
5 files changed, 478 insertions, 49 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index b5431d1db2a..813401c3c94 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -86,6 +86,7 @@ env.Library( tlEnv.Library( target='service_executor', source=[ + 'service_executor_fixed.cpp', 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', env.Idlc('service_executor.idl')[0], @@ -95,6 +96,7 @@ tlEnv.Library( ], LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/idl/server_parameter", + "$BUILD_DIR/mongo/util/concurrency/thread_pool", "$BUILD_DIR/mongo/util/processinfo", '$BUILD_DIR/third_party/shim_asio', 'transport_layer_common', diff --git a/src/mongo/transport/service_executor.idl b/src/mongo/transport/service_executor.idl index 6e9e897bf40..d380492ec62 100644 --- a/src/mongo/transport/service_executor.idl +++ b/src/mongo/transport/service_executor.idl @@ -37,59 +37,13 @@ server_parameters: cpp_vartype: "AtomicWord<int>" cpp_varname: "synchronousServiceExecutorRecursionLimit" default: 8 - adaptiveServiceExecutorReservedThreads: - description: >- - The executor will always keep this many threads around. - If the value is -1, then it will be set to number of cores / 2. - set_at: [ startup, runtime ] - cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorReservedThreads" - default: -1 - adaptiveServiceExecutorRunTimeMillis: - description: >- - Each worker thread will allow ASIO to run for this many milliseconds - before checking whether it should exit. - set_at: [ startup, runtime ] - cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorRunTimeMillis" - default: 5000 - adaptiveServiceExecutorRunTimeJitterMillis: - description: >- - The period that a worker thread runs will be offset by at most this value - so that not all threads are scheduled at the same time. - set_at: [ startup, runtime ] - cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorRunTimeJitterMillis" - default: 500 - adaptiveServiceExecutorStuckThreadTimeoutMillis: - description: >- - The maximum amount of time the controller thread will sleep - before doing any stuck-thread detection. - set_at: [ startup, runtime ] - cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorStuckThreadTimeoutMillis" - default: 250 - adaptiveServiceExecutorMaxQueueLatencyMicros: - description: >- - The maximum allowed latency between when a task is scheduled - and a thread is started to service it. - set_at: [ startup, runtime ] - cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorMaxQueueLatencyMicros" - default: 500 - adaptiveServiceExecutorIdlePctThreshold: - description: >- - The percent boundary of work time over run time below which worker threads will exit. - set_at: [ startup, runtime ] - cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorIdlePctThreshold" - default: 60 - adaptiveServiceExecutorRecursionLimit: + + fixedServiceExecutorRecursionLimit: description: >- Tasks may recurse further if their recursion depth is less than this value. set_at: [ startup, runtime ] cpp_vartype: "AtomicWord<int>" - cpp_varname: "adaptiveServiceExecutorRecursionLimit" + cpp_varname: "fixedServiceExecutorRecursionLimit" default: 8 reservedServiceExecutorRecursionLimit: diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp new file mode 100644 index 00000000000..48a94cd57cd --- /dev/null +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor + +#include "mongo/transport/service_executor_fixed.h" + +#include "mongo/base/error_codes.h" +#include "mongo/logv2/log.h" +#include "mongo/transport/service_executor_gen.h" +#include "mongo/util/fail_point.h" + +namespace mongo { + +MONGO_FAIL_POINT_DEFINE(hangBeforeSchedulingServiceExecutorFixedTask); + +namespace transport { +namespace { +constexpr auto kThreadsRunning = "threadsRunning"_sd; +constexpr auto kExecutorLabel = "executor"_sd; +constexpr auto kExecutorName = "fixed"_sd; +} // namespace + +ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) + : _options(std::move(options)) { + _options.onCreateThread = + [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable { + _executorContext = std::make_unique<ExecutorThreadContext>(this->weak_from_this()); + onCreate(name); + }; + _threadPool = std::make_unique<ThreadPool>(_options); +} + +ServiceExecutorFixed::~ServiceExecutorFixed() { + invariant(!_canScheduleWork.load()); + if (_state == State::kNotStarted) + return; + + // Ensures we always call "shutdown" after staring the service executor + invariant(_state == State::kStopped); + _threadPool->shutdown(); + _threadPool->join(); + invariant(_numRunningExecutorThreads.load() == 0); +} + +Status ServiceExecutorFixed::start() { + stdx::lock_guard<Latch> lk(_mutex); + auto oldState = std::exchange(_state, State::kRunning); + invariant(oldState == State::kNotStarted); + _threadPool->startup(); + _canScheduleWork.store(true); + LOGV2_DEBUG( + 4910501, 3, "Started fixed thread-pool service executor", "name"_attr = _options.poolName); + return Status::OK(); +} + +Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { + auto waitForShutdown = [&]() mutable -> Status { + stdx::unique_lock<Latch> lk(_mutex); + bool success = _shutdownCondition.wait_for(lk, timeout.toSystemDuration(), [this] { + return _numRunningExecutorThreads.load() == 0; + }); + return success ? Status::OK() + : Status(ErrorCodes::ExceededTimeLimit, + "Failed to shutdown all executor threads within the time limit"); + }; + + LOGV2_DEBUG(4910502, + 3, + "Shutting down fixed thread-pool service executor", + "name"_attr = _options.poolName); + + { + stdx::lock_guard<Latch> lk(_mutex); + _canScheduleWork.store(false); + + auto oldState = std::exchange(_state, State::kStopped); + if (oldState != State::kStopped) { + _threadPool->shutdown(); + } + } + + return waitForShutdown(); +} + +Status ServiceExecutorFixed::schedule(Task task, ScheduleFlags flags) { + if (!_canScheduleWork.load()) { + return Status(ErrorCodes::ShutdownInProgress, "Executor is not running"); + } + + auto mayExecuteTaskInline = [&] { + if (!(flags & ScheduleFlags::kMayRecurse)) + return false; + if (!_executorContext) + return false; + return true; + }; + + + if (mayExecuteTaskInline()) { + invariant(_executorContext); + if (_executorContext->getRecursionDepth() < + fixedServiceExecutorRecursionLimit.loadRelaxed()) { + // Recursively executing the task on the executor thread. + _executorContext->run(task); + return Status::OK(); + } + } + + hangBeforeSchedulingServiceExecutorFixedTask.pauseWhileSet(); + + // May throw if an attempt is made to schedule after the thread pool is shutdown. + try { + _threadPool->schedule([task = std::move(task)](Status status) { + internalAssert(status); + invariant(_executorContext); + _executorContext->run(task); + }); + } catch (DBException& e) { + return e.toStatus(); + } + + return Status::OK(); +} + +void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { + *bob << kExecutorLabel << kExecutorName << kThreadsRunning + << static_cast<int>(_numRunningExecutorThreads.load()); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h new file mode 100644 index 00000000000..c418296db8b --- /dev/null +++ b/src/mongo/transport/service_executor_fixed.h @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_executor.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/hierarchical_acquisition.h" + +namespace mongo { +namespace transport { + +/** + * A service executor that uses a fixed (configurable) number of threads to execute tasks. + * This executor always yields before executing scheduled tasks, and never yields before scheduling + * new tasks (i.e., `ScheduleFlags::kMayYieldBeforeSchedule` is a no-op for this executor). + */ +class ServiceExecutorFixed : public ServiceExecutor, + public std::enable_shared_from_this<ServiceExecutorFixed> { +public: + explicit ServiceExecutorFixed(ThreadPool::Options options); + virtual ~ServiceExecutorFixed(); + + Status start() override; + Status shutdown(Milliseconds timeout) override; + Status schedule(Task task, ScheduleFlags flags) override; + + Mode transportMode() const override { + return Mode::kSynchronous; + } + + void appendStats(BSONObjBuilder* bob) const override; + +private: + // Maintains the execution state (e.g., recursion depth) for executor threads + class ExecutorThreadContext { + public: + ExecutorThreadContext(std::weak_ptr<ServiceExecutorFixed> serviceExecutor) + : _executor(std::move(serviceExecutor)) { + _adjustRunningExecutorThreads(1); + } + + ExecutorThreadContext(ExecutorThreadContext&&) = delete; + ExecutorThreadContext(const ExecutorThreadContext&) = delete; + + ~ExecutorThreadContext() { + _adjustRunningExecutorThreads(-1); + } + + void run(ServiceExecutor::Task task) { + // Yield here to improve concurrency, especially when there are more executor threads + // than CPU cores. + stdx::this_thread::yield(); + _recursionDepth++; + task(); + _recursionDepth--; + } + + int getRecursionDepth() const { + return _recursionDepth; + } + + private: + void _adjustRunningExecutorThreads(int adjustment) { + if (auto executor = _executor.lock()) { + executor->_numRunningExecutorThreads.fetchAndAdd(adjustment); + } + } + + int _recursionDepth = 0; + std::weak_ptr<ServiceExecutorFixed> _executor; + }; + +private: + AtomicWord<size_t> _numRunningExecutorThreads{0}; + AtomicWord<bool> _canScheduleWork{false}; + + mutable Mutex _mutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ServiceExecutorFixed::_mutex"); + stdx::condition_variable _shutdownCondition; + + /** + * State transition diagram: kNotStarted ---> kRunning ---> kStopped + * The service executor cannot be in "kRunning" when its destructor is invoked. + */ + enum State { kNotStarted, kRunning, kStopped } _state = kNotStarted; + + ThreadPool::Options _options; + std::unique_ptr<ThreadPool> _threadPool; + + static inline thread_local std::unique_ptr<ExecutorThreadContext> _executorContext; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 4838789df47..99ee7f3ddb5 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -32,13 +32,21 @@ #include "mongo/platform/basic.h" #include "boost/optional.hpp" +#include <algorithm> +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/transport/service_executor_fixed.h" +#include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/transport_layer.h" #include "mongo/unittest/barrier.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/future.h" #include "mongo/util/scopeguard.h" #include <asio.hpp> @@ -158,6 +166,185 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) { scheduleBasicTask(executor.get(), false); } +class ServiceExecutorFixedFixture : public unittest::Test { +public: + static constexpr auto kNumExecutorThreads = 2; + + void setUp() override { + ThreadPool::Options options; + options.minThreads = options.maxThreads = kNumExecutorThreads; + options.poolName = "Test"; + _executor = std::make_shared<ServiceExecutorFixed>(std::move(options)); + } + + void tearDown() override { + if (_skipShutdown) + return; + ASSERT_OK(_executor->shutdown(kShutdownTime)); + } + + void skipShutdown(bool skip) { + _skipShutdown = skip; + } + + auto getServiceExecutor() const { + return _executor; + } + + auto startAndGetServiceExecutor() { + ASSERT_OK(_executor->start()); + return getServiceExecutor(); + } + + auto makeRecursionGuard() { + _recursionDepth.fetchAndAdd(1); + return makeGuard([this] { _recursionDepth.fetchAndSubtract(1); }); + } + + auto getRecursionDepth() const { + return _recursionDepth.load(); + } + +private: + AtomicWord<int> _recursionDepth{0}; + bool _skipShutdown = false; + std::shared_ptr<ServiceExecutorFixed> _executor; +}; + +TEST_F(ServiceExecutorFixedFixture, ScheduleFailsBeforeStartup) { + auto executor = getServiceExecutor(); + ASSERT_NOT_OK(executor->schedule([] {}, ServiceExecutor::kEmptyFlags)); +} + +DEATH_TEST_F(ServiceExecutorFixedFixture, DestructorFailsBeforeShutdown, "invariant") { + startAndGetServiceExecutor(); + skipShutdown(true); +} + +TEST_F(ServiceExecutorFixedFixture, BasicTaskRuns) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(2); + ASSERT_OK(executor->schedule([barrier]() mutable { barrier->countDownAndWait(); }, + ServiceExecutor::kEmptyFlags)); + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, RecursiveTask) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(2); + + ServiceExecutor::Task recursiveTask; + recursiveTask = [this, executor, barrier, &recursiveTask] { + auto recursionGuard = makeRecursionGuard(); + if (getRecursionDepth() < fixedServiceExecutorRecursionLimit.load()) { + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse)); + } else { + // This test never returns unless the service executor can satisfy the recursion depth. + barrier->countDownAndWait(); + } + }; + + // Schedule recursive task and wait for the recursion to stop + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayRecurse)); + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, FlattenRecursiveScheduledTasks) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(2); + AtomicWord<int> tasksToSchedule{fixedServiceExecutorRecursionLimit.load() * 3}; + + // A recursive task that expects to be executed non-recursively. The task recursively schedules + // "tasksToSchedule" tasks to the service executor, and each scheduled task verifies that the + // recursion depth remains zero during its execution. + ServiceExecutor::Task recursiveTask; + recursiveTask = [this, executor, barrier, &recursiveTask, &tasksToSchedule] { + auto recursionGuard = makeRecursionGuard(); + ASSERT_EQ(getRecursionDepth(), 1); + if (tasksToSchedule.fetchAndSubtract(1) > 0) { + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kEmptyFlags)); + } else { + // Once there are no more tasks to schedule, notify the main thread to proceed. + barrier->countDownAndWait(); + } + }; + + // Schedule the recursive task and wait for the execution to finish. + ASSERT_OK(executor->schedule(recursiveTask, ServiceExecutor::kMayYieldBeforeSchedule)); + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, ShutdownTimeLimit) { + auto executor = startAndGetServiceExecutor(); + auto invoked = std::make_shared<SharedPromise<void>>(); + auto mayReturn = std::make_shared<SharedPromise<void>>(); + + ASSERT_OK(executor->schedule( + [executor, invoked, mayReturn]() mutable { + invoked->emplaceValue(); + mayReturn->getFuture().get(); + }, + ServiceExecutor::kEmptyFlags)); + + invoked->getFuture().get(); + ASSERT_NOT_OK(executor->shutdown(kShutdownTime)); + + // Ensure the service executor is stopped before leaving the test. + mayReturn->emplaceValue(); +} + +TEST_F(ServiceExecutorFixedFixture, Stats) { + auto executor = startAndGetServiceExecutor(); + auto barrier = std::make_shared<unittest::Barrier>(kNumExecutorThreads + 1); + stdx::condition_variable cond; + + auto task = [barrier, &cond]() mutable { + cond.notify_one(); + barrier->countDownAndWait(); + }; + + for (auto i = 0; i < kNumExecutorThreads; i++) { + ASSERT_OK(executor->schedule(task, ServiceExecutor::kEmptyFlags)); + } + + // The main thread waits for the executor threads to bump up "threadsRunning" while picking up a + // task to execute. Once all executor threads are running, the main thread will unblock them + // through the barrier. + auto mutex = MONGO_MAKE_LATCH(); + stdx::unique_lock<Latch> lk(mutex); + cond.wait(lk, [&executor]() { + BSONObjBuilder bob; + executor->appendStats(&bob); + auto obj = bob.obj(); + ASSERT(obj.hasField("threadsRunning")); + auto threadsRunning = obj.getIntField("threadsRunning"); + LOGV2_DEBUG( + 4910503, 1, "Checked number of executor threads", "threads"_attr = threadsRunning); + return threadsRunning == static_cast<int>(ServiceExecutorFixedFixture::kNumExecutorThreads); + }); + + barrier->countDownAndWait(); +} + +TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { + auto executor = startAndGetServiceExecutor(); + std::unique_ptr<stdx::thread> schedulerThread; + + { + // Spawn a thread to schedule a task, and block it before it can schedule the task with the + // underlying thread-pool. Then shutdown the service executor and unblock the scheduler + // thread. This order of events must cause "schedule()" to return a non-okay status. + FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask"); + schedulerThread = std::make_unique<stdx::thread>([executor] { + ASSERT_NOT_OK( + executor->schedule([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags)); + }); + failpoint->waitForTimesEntered(1); + ASSERT_OK(executor->shutdown(kShutdownTime)); + } + + schedulerThread->join(); +} } // namespace } // namespace mongo |