From 243a506fca5b2cca017192ff455e1520d9783a9e Mon Sep 17 00:00:00 2001 From: Ben Caimano Date: Mon, 10 Aug 2020 17:54:46 +0000 Subject: SERVER-49109 ServiceExecutorFixed tracks work and sessions This commit also adds a server parameter to start on the "borrowed" threading model and introduces an evergreen variant for it. --- etc/evergreen.yml | 30 ++ src/mongo/base/error_codes.yml | 3 + src/mongo/transport/service_entry_point_impl.cpp | 24 +- src/mongo/transport/service_executor.cpp | 31 +- src/mongo/transport/service_executor.h | 21 +- src/mongo/transport/service_executor.idl | 21 ++ src/mongo/transport/service_executor_fixed.cpp | 364 ++++++++++++++++++----- src/mongo/transport/service_executor_fixed.h | 97 +++--- src/mongo/transport/service_executor_test.cpp | 48 ++- 9 files changed, 488 insertions(+), 151 deletions(-) diff --git a/etc/evergreen.yml b/etc/evergreen.yml index de9f17fc7f7..997d9ef897d 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -12600,6 +12600,36 @@ buildvariants: distros: - ubuntu1804-build +- name: enterprise-ubuntu-fixed-service-executor-1604-64-bit + display_name: "~ Enterprise Ubuntu 16.04 (with FixedServiceExecutor)" + batchtime: 1440 # 1 day + run_on: + - ubuntu1604-test + modules: + - enterprise + expansions: + scons_cache_scope: shared + compile_flags: MONGO_DISTMOD=ubuntu1604 -j$(grep -c ^processor /proc/cpuinfo) --variables-files=etc/scons/mongodbtoolchain_v3_gcc.vars + multiversion_platform: ubuntu1604 + multiversion_edition: enterprise + test_flags: >- + --mongosSetParameters="{initialServiceExecutorThreadingModel: borrowed}" + --mongodSetParameters="{initialServiceExecutorThreadingModel: borrowed}" + + tasks: + - name: compile_all_run_unittests_TG + distros: + - ubuntu1604-build + - name: .aggregation !.no_async + - name: .sharding .auth + - name: .sharding .causally_consistent !.wo_snapshot + - name: .concurrency .common !.kill_terminate + - name: .integration !.audit + - name: .jscore .common + - name: .logical_session_cache .one_sec + - name: .sharding .jscore !.wo_snapshot !.multi_stmt + - name: .sharding .common !.csrs + - name: enterprise-ubuntu-scanning-replica-set-monitor-1604-64-bit display_name: "~ Enterprise Ubuntu 16.04 (with ScanningReplicaSetMonitor)" batchtime: 1440 # 1 day diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 05aaaaa15f4..a6317f1c671 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -405,6 +405,9 @@ error_codes: - {code: 329, name: FailedToRunWithReplyBuilder} + # Internal error + - {code: 330, name: ServiceExecutorInShutdown, categories: [ShutdownError,CancelationError]} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 38093644e2c..400979f14d9 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -41,6 +41,7 @@ #include "mongo/logv2/log.h" #include "mongo/transport/ismaster_metrics.h" #include "mongo/transport/service_executor.h" +#include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/session.h" #include "mongo/util/processinfo.h" @@ -143,15 +144,13 @@ Status ServiceEntryPointImpl::start() { } } - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { - // return status; - // } + if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { + return status; + } return Status::OK(); } -// TODO: explicitly start on the fixed executor void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs const auto& remoteAddr = session->remoteAddr(); @@ -221,7 +220,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { }); auto seCtx = transport::ServiceExecutorContext{}; - seCtx.setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated); + seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel()); seCtx.setCanUseReserved(canOverrideMaxConns); ssm->start(std::move(seCtx)); } @@ -284,13 +283,12 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { lk.unlock(); - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - // timeout = std::max(Milliseconds{0}, timeout - timeSpent); - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); - // !status.isOK()) { - // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); - // } + timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + timeout = std::max(Milliseconds{0}, timeout - timeSpent); + if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); + !status.isOK()) { + LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); + } timeSpent = _svcCtx->getPreciseClockSource()->now() - start; timeout = std::max(Milliseconds{0}, timeout - timeSpent); diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp index c5fd302d64c..0a8815bdd22 100644 --- a/src/mongo/transport/service_executor.cpp +++ b/src/mongo/transport/service_executor.cpp @@ -47,23 +47,44 @@ namespace transport { namespace { static constexpr auto kDiagnosticLogLevel = 4; +static constexpr auto kThreadingModelDedicatedStr = "dedicated"_sd; +static constexpr auto kThreadingModelBorrowedStr = "borrowed"_sd; + +auto gInitialThreadingModel = ServiceExecutor::ThreadingModel::kDedicated; + auto getServiceExecutorStats = ServiceContext::declareDecoration>(); auto getServiceExecutorContext = Client::declareDecoration>(); } // namespace -StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) { +StringData toString(ServiceExecutor::ThreadingModel threadingModel) { switch (threadingModel) { - case ServiceExecutorContext::ThreadingModel::kDedicated: - return "Dedicated"_sd; - case ServiceExecutorContext::ThreadingModel::kBorrowed: - return "Borrowed"_sd; + case ServiceExecutor::ThreadingModel::kDedicated: + return kThreadingModelDedicatedStr; + case ServiceExecutor::ThreadingModel::kBorrowed: + return kThreadingModelBorrowedStr; default: MONGO_UNREACHABLE; } } +Status ServiceExecutor::setInitialThreadingModel(StringData value) noexcept { + if (value == kThreadingModelDedicatedStr) { + gInitialThreadingModel = ServiceExecutor::ThreadingModel::kDedicated; + } else if (value == kThreadingModelBorrowedStr) { + gInitialThreadingModel = ServiceExecutor::ThreadingModel::kBorrowed; + } else { + MONGO_UNREACHABLE; + } + + return Status::OK(); +} + +auto ServiceExecutor::getInitialThreadingModel() noexcept -> ThreadingModel { + return gInitialThreadingModel; +} + ServiceExecutorStats ServiceExecutorStats::get(ServiceContext* ctx) noexcept { return getServiceExecutorStats(ctx).get(); } diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 773f9372913..95f5bac68a3 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -51,6 +51,20 @@ namespace transport { */ class ServiceExecutor : public OutOfLineExecutor { public: + /** + * An enum to indicate if a ServiceExecutor should use dedicated or borrowed threading + * resources. + */ + enum class ThreadingModel { + kBorrowed, + kDedicated, + }; + + friend StringData toString(ThreadingModel threadingModel); + + static Status setInitialThreadingModel(StringData value) noexcept; + static ThreadingModel getInitialThreadingModel() noexcept; + virtual ~ServiceExecutor() = default; using Task = unique_function; enum ScheduleFlags { @@ -127,10 +141,7 @@ public: */ class ServiceExecutorContext { public: - enum ThreadingModel { - kBorrowed, - kDedicated, - }; + using ThreadingModel = ServiceExecutor::ThreadingModel; /** * Get a pointer to the ServiceExecutorContext for a given client. @@ -202,8 +213,6 @@ public: ServiceExecutor* getServiceExecutor() noexcept; private: - friend StringData toString(ThreadingModel threadingModel); - Client* _client = nullptr; ServiceEntryPoint* _sep = nullptr; diff --git a/src/mongo/transport/service_executor.idl b/src/mongo/transport/service_executor.idl index d380492ec62..d7235440de8 100644 --- a/src/mongo/transport/service_executor.idl +++ b/src/mongo/transport/service_executor.idl @@ -28,8 +28,18 @@ global: cpp_namespace: "mongo::transport" + cpp_includes: + - "mongo/transport/service_executor.h" server_parameters: + initialServiceExecutorThreadingModel: + description: >- + Start new client connections using an executor that follows this model. + set_at: [ startup ] + cpp_vartype: "std::string" + cpp_varname: "initialServiceExecutorThreadingModel" + on_update: "ServiceExecutor::setInitialThreadingModel" + default: "dedicated" synchronousServiceExecutorRecursionLimit: description: >- Tasks may recurse further if their recursion depth is less than this value. @@ -53,3 +63,14 @@ server_parameters: cpp_vartype: 'AtomicWord' cpp_varname: reservedServiceExecutorRecursionLimit default: 8 + + fixedServiceExecutorThreadLimit: + description: >- + The fixed service executor (thread model "borrowed") can only maintain a count of threads + less than this value. + set_at: [ startup ] + cpp_vartype: "int" + cpp_varname: "fixedServiceExecutorThreadLimit" + default: 1000 + validator: + gte: 10 diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index b9b5424872e..fb34101f213 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -35,8 +35,10 @@ #include "mongo/logv2/log.h" #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/session.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" +#include "mongo/util/testing_proctor.h" #include "mongo/util/thread_safety_context.h" namespace mongo { @@ -52,47 +54,166 @@ constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "fixed"_sd; const auto getServiceExecutorFixed = - ServiceContext::declareDecoration>(); + ServiceContext::declareDecoration>(); const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{ "ServiceExecutorFixed", [](ServiceContext* ctx) { + auto limits = ThreadPool::Limits{}; + limits.minThreads = 0; + limits.maxThreads = fixedServiceExecutorThreadLimit; getServiceExecutorFixed(ctx) = - std::make_unique(ThreadPool::Options{}); + std::make_shared(ctx, std::move(limits)); }}; } // namespace -ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) - : _options(std::move(options)) { - _options.onCreateThread = - [this, onCreate = std::move(_options.onCreateThread)](const std::string& name) mutable { - _executorContext = std::make_unique(this->weak_from_this()); - if (onCreate) { - onCreate(name); - } - }; - _threadPool = std::make_unique(_options); +class ServiceExecutorFixed::ExecutorThreadContext { +public: + ExecutorThreadContext(ServiceExecutorFixed* serviceExecutor); + ~ExecutorThreadContext(); + + ExecutorThreadContext(ExecutorThreadContext&&) = delete; + ExecutorThreadContext(const ExecutorThreadContext&) = delete; + + template + void run(Task&& task) { + // Yield here to improve concurrency, especially when there are more executor threads + // than CPU cores. + stdx::this_thread::yield(); + _executor->_stats.tasksStarted.fetchAndAdd(1); + _recursionDepth++; + + ON_BLOCK_EXIT([&] { + _recursionDepth--; + _executor->_stats.tasksEnded.fetchAndAdd(1); + + auto lk = stdx::lock_guard(_executor->_mutex); + _executor->_checkForShutdown(lk); + }); + + std::forward(task)(); + } + + int getRecursionDepth() const { + return _recursionDepth; + } + +private: + ServiceExecutorFixed* const _executor; + int _recursionDepth = 0; +}; + +ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext( + ServiceExecutorFixed* serviceExecutor) + : _executor(serviceExecutor) { + _executor->_stats.threadsStarted.fetchAndAdd(1); + hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet(); +} + +ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() { + auto ended = _executor->_stats.threadsEnded.addAndFetch(1); + auto started = _executor->_stats.threadsStarted.loadRelaxed(); + if (ended == started) { + hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet(); + } +} + +thread_local std::unique_ptr + ServiceExecutorFixed::_executorContext; + +ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limits limits) + : _svcCtx{ctx}, _options(std::move(limits)) { + _options.poolName = "ServiceExecutorFixed"; + _options.onCreateThread = [this](const auto&) { + _executorContext = std::make_unique(this); + }; + + _threadPool = std::make_shared(_options); } ServiceExecutorFixed::~ServiceExecutorFixed() { - invariant(!_canScheduleWork.load()); - if (_state == State::kNotStarted) - return; + switch (_state) { + case State::kNotStarted: + return; + case State::kRunning: { + // We should not be running while in this destructor. + MONGO_UNREACHABLE; + } + case State::kStopping: + case State::kStopped: { + // We can go ahead and attempt to join our thread pool. + } break; + default: { MONGO_UNREACHABLE; } + } + + LOGV2_DEBUG(4910502, + kDiagnosticLogLevel, + "Shutting down pool for fixed thread-pool service executor", + "name"_attr = _options.poolName); - // Ensures we always call "shutdown" after staring the service executor - invariant(_state == State::kStopped); + // We only can desturct when we have joined all of our tasks and canceled all of our sessions. + // This thread pool doesn't get to refuse work over its lifetime. It's possible that tasks are + // stiil blocking. If so, we block until they finish here. _threadPool->shutdown(); _threadPool->join(); - invariant(_numRunningExecutorThreads.load() == 0); + + invariant(_threadsRunning() == 0); + invariant(_tasksRunning() == 0); + invariant(_waiters.empty()); } Status ServiceExecutorFixed::start() { - stdx::lock_guard lk(_mutex); - auto oldState = std::exchange(_state, State::kRunning); - invariant(oldState == State::kNotStarted); + { + stdx::lock_guard lk(_mutex); + switch (_state) { + case State::kNotStarted: { + // Time to start + _state = State::kRunning; + } break; + case State::kRunning: { + return Status::OK(); + } + case State::kStopping: + case State::kStopped: { + return {ErrorCodes::ServiceExecutorInShutdown, + "ServiceExecutorFixed is already stopping or stopped"}; + } + default: { MONGO_UNREACHABLE; } + }; + } + + LOGV2_DEBUG(4910501, + kDiagnosticLogLevel, + "Starting fixed thread-pool service executor", + "name"_attr = _options.poolName); + _threadPool->startup(); - _canScheduleWork.store(true); - LOGV2_DEBUG( - 4910501, 3, "Started fixed thread-pool service executor", "name"_attr = _options.poolName); + + if (!_svcCtx) { + // For some tests, we do not have a ServiceContext. + invariant(TestingProctor::instance().isEnabled()); + return Status::OK(); + } + + auto tl = _svcCtx->getTransportLayer(); + invariant(tl); + + auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress); + invariant(reactor); + _threadPool->schedule([this, reactor](Status) { + { + // Check to make sure we haven't been shutdown already. Note that there is still a brief + // race that immediately follows this check. ASIOReactor::stop() is not permanent, thus + // our run() could "restart" the reactor. + stdx::lock_guard lk(_mutex); + if (_state != kRunning) { + return; + } + } + + // Start running on the reactor immediately. + reactor->run(); + }); + return Status::OK(); } @@ -103,37 +224,115 @@ ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) { } Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { - auto waitForShutdown = [&]() mutable -> Status { - stdx::unique_lock lk(_mutex); - bool success = _shutdownCondition.wait_for(lk, timeout.toSystemDuration(), [this] { - return _numRunningExecutorThreads.load() == 0; - }); - return success ? Status::OK() - : Status(ErrorCodes::ExceededTimeLimit, - "Failed to shutdown all executor threads within the time limit"); - }; - - LOGV2_DEBUG(4910502, - 3, + LOGV2_DEBUG(4910503, + kDiagnosticLogLevel, "Shutting down fixed thread-pool service executor", "name"_attr = _options.poolName); { - stdx::lock_guard lk(_mutex); - _canScheduleWork.store(false); + auto lk = stdx::unique_lock(_mutex); + + switch (_state) { + case State::kNotStarted: + case State::kRunning: { + _state = State::kStopping; + + for (auto& waiter : _waiters) { + // Cancel any session we own. + waiter.session->cancelAsyncOperations(); + } + + // There may not be outstanding threads, check for shutdown now. + _checkForShutdown(lk); - auto oldState = std::exchange(_state, State::kStopped); - if (oldState != State::kStopped) { - _threadPool->shutdown(); + if (_state == State::kStopped) { + // We were able to become stopped immediately. + return Status::OK(); + } + } break; + case State::kStopping: { + // Just nead to wait it out. + } break; + case State::kStopped: { + // Totally done. + return Status::OK(); + } break; + default: { MONGO_UNREACHABLE; } } } - return waitForShutdown(); + LOGV2_DEBUG(4910504, + kDiagnosticLogLevel, + "Waiting for shutdown of fixed thread-pool service executor", + "name"_attr = _options.poolName); + + // There is a world where we are able to simply do a timed wait upon a future chain. However, + // that world likely requires an OperationContext available through shutdown. + auto lk = stdx::unique_lock(_mutex); + if (!_shutdownCondition.wait_for( + lk, timeout.toSystemDuration(), [this] { return _state == State::kStopped; })) { + return Status(ErrorCodes::ExceededTimeLimit, + "Failed to shutdown all executor threads within the time limit"); + } + + return Status::OK(); +} + +void ServiceExecutorFixed::_checkForShutdown(WithLock) { + if (_state == State::kRunning) { + // We're actively running. + return; + } + invariant(_state != State::kNotStarted); + + if (!_waiters.empty()) { + // We still have some in wait. + return; + } + + auto tasksLeft = _tasksLeft(); + if (tasksLeft > 0) { + // We have tasks remaining. + return; + } + invariant(tasksLeft == 0); + + // We have achieved a soft form of shutdown: + // - _state != kRunning means that there will be no new external tasks or waiters. + // - _waiters.empty() means that all network waits have finished and there will be no new + // internal tasks. + // - _tasksLeft() == 0 means that all tasks, both internal and external have finished. + // + // From this point on, all of our threads will be idle. When the dtor runs, the thread pool will + // experience a trivial shutdown() and join(). + _state = State::kStopped; + + LOGV2_DEBUG( + 4910505, kDiagnosticLogLevel, "Finishing shutdown", "name"_attr = _options.poolName); + _shutdownCondition.notify_one(); + + if (!_svcCtx) { + // For some tests, we do not have a ServiceContext. + invariant(TestingProctor::instance().isEnabled()); + return; + } + + auto tl = _svcCtx->getTransportLayer(); + invariant(tl); + + auto reactor = tl->getReactor(TransportLayer::WhichReactor::kIngress); + invariant(reactor); + reactor->stop(); } Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { - if (!_canScheduleWork.load()) { - return Status(ErrorCodes::ShutdownInProgress, "Executor is not running"); + { + auto lk = stdx::unique_lock(_mutex); + if (_state != State::kRunning) { + return kInShutdown; + } + + _stats.tasksScheduled.fetchAndAdd(1); } auto mayExecuteTaskInline = [&] { @@ -157,31 +356,70 @@ Status ServiceExecutorFixed::scheduleTask(Task task, ScheduleFlags flags) { hangBeforeSchedulingServiceExecutorFixedTask.pauseWhileSet(); - // May throw if an attempt is made to schedule after the thread pool is shutdown. - try { - _threadPool->schedule([task = std::move(task)](Status status) mutable { - internalAssert(status); - invariant(_executorContext); - _executorContext->run(std::move(task)); - }); - } catch (DBException& e) { - return e.toStatus(); - } + _threadPool->schedule([this, task = std::move(task)](Status status) mutable { + invariant(status); + + _executorContext->run([&] { task(); }); + }); return Status::OK(); } +void ServiceExecutorFixed::_schedule(OutOfLineExecutor::Task task) noexcept { + { + auto lk = stdx::unique_lock(_mutex); + if (_state != State::kRunning) { + lk.unlock(); + + task(kInShutdown); + return; + } + + _stats.tasksScheduled.fetchAndAdd(1); + } + + _threadPool->schedule([this, task = std::move(task)](Status status) mutable { + _executorContext->run([&] { task(std::move(status)); }); + }); +} + void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) { invariant(session); + + auto waiter = Waiter{session, std::move(onCompletionCallback)}; + + WaiterList::iterator it; + { + // Make sure we're still allowed to schedule and track the session + auto lk = stdx::unique_lock(_mutex); + if (_state != State::kRunning) { + lk.unlock(); + waiter.onCompletionCallback(kInShutdown); + return; + } + + it = _waiters.emplace(_waiters.end(), std::move(waiter)); + } + session->asyncWaitForData() .thenRunOn(shared_from_this()) - .getAsync(std::move(onCompletionCallback)); + .getAsync([this, anchor = shared_from_this(), it](Status status) mutable { + Waiter waiter; + { + // Remove our waiter from the list. + auto lk = stdx::unique_lock(_mutex); + waiter = std::exchange(*it, {}); + _waiters.erase(it); + } + + waiter.onCompletionCallback(std::move(status)); + }); } void ServiceExecutorFixed::appendStats(BSONObjBuilder* bob) const { *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast(_numRunningExecutorThreads.load()); + << static_cast(_threadsRunning()); } int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { @@ -189,19 +427,5 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { return _executorContext->getRecursionDepth(); } -ServiceExecutorFixed::ExecutorThreadContext::ExecutorThreadContext( - std::weak_ptr serviceExecutor) - : _executor(std::move(serviceExecutor)) { - _adjustRunningExecutorThreads(1); - hangAfterServiceExecutorFixedExecutorThreadsStart.pauseWhileSet(); -} - -ServiceExecutorFixed::ExecutorThreadContext::~ExecutorThreadContext() { - if (auto threadsRunning = _adjustRunningExecutorThreads(-1); - threadsRunning.has_value() && threadsRunning.value() == 0) { - hangBeforeServiceExecutorFixedLastExecutorThreadReturns.pauseWhileSet(); - } -} - } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index a68994d9fb3..3a96ec30f66 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -38,8 +38,11 @@ #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" +#include "mongo/stdx/unordered_map.h" #include "mongo/transport/service_executor.h" #include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/future.h" #include "mongo/util/hierarchical_acquisition.h" namespace mongo { @@ -50,10 +53,17 @@ namespace transport { * This executor always yields before executing scheduled tasks, and never yields before scheduling * new tasks (i.e., `ScheduleFlags::kMayYieldBeforeSchedule` is a no-op for this executor). */ -class ServiceExecutorFixed : public ServiceExecutor, - public std::enable_shared_from_this { +class ServiceExecutorFixed final : public ServiceExecutor, + public std::enable_shared_from_this { + static constexpr auto kDiagnosticLogLevel = 3; + + static const inline auto kInShutdown = + Status(ErrorCodes::ServiceExecutorInShutdown, "ServiceExecutorFixed is not running"); + public: - explicit ServiceExecutorFixed(ThreadPool::Options options); + explicit ServiceExecutorFixed(ServiceContext* ctx, ThreadPool::Limits limits); + explicit ServiceExecutorFixed(ThreadPool::Limits limits) + : ServiceExecutorFixed(nullptr, std::move(limits)) {} virtual ~ServiceExecutorFixed(); static ServiceExecutorFixed* get(ServiceContext* ctx); @@ -61,6 +71,9 @@ public: Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; + void schedule(OutOfLineExecutor::Task task) override { + _schedule(std::move(task)); + } void runOnDataAvailable(const SessionHandle& session, OutOfLineExecutor::Task onCompletionCallback) override; @@ -79,57 +92,57 @@ public: private: // Maintains the execution state (e.g., recursion depth) for executor threads - class ExecutorThreadContext { - public: - ExecutorThreadContext(std::weak_ptr serviceExecutor); - ~ExecutorThreadContext(); - - ExecutorThreadContext(ExecutorThreadContext&&) = delete; - ExecutorThreadContext(const ExecutorThreadContext&) = delete; - - void run(ServiceExecutor::Task task) { - // Yield here to improve concurrency, especially when there are more executor threads - // than CPU cores. - stdx::this_thread::yield(); - _recursionDepth++; - task(); - _recursionDepth--; - } - - int getRecursionDepth() const { - return _recursionDepth; - } - - private: - boost::optional _adjustRunningExecutorThreads(int adjustment) { - if (auto executor = _executor.lock()) { - return executor->_numRunningExecutorThreads.addAndFetch(adjustment); - } - return boost::none; - } - - int _recursionDepth = 0; - std::weak_ptr _executor; - }; + class ExecutorThreadContext; private: - AtomicWord _numRunningExecutorThreads{0}; - AtomicWord _canScheduleWork{false}; + void _checkForShutdown(WithLock); + void _schedule(OutOfLineExecutor::Task task) noexcept; + + auto _threadsRunning() const { + return _stats.threadsStarted.load() - _stats.threadsEnded.loadRelaxed(); + } + + auto _tasksRunning() const { + return _stats.tasksStarted.load() - _stats.tasksEnded.loadRelaxed(); + } + + auto _tasksLeft() const { + return _stats.tasksScheduled.load() - _stats.tasksEnded.loadRelaxed(); + } + + struct Stats { + AtomicWord threadsStarted{0}; + AtomicWord threadsEnded{0}; + + AtomicWord tasksScheduled{0}; + AtomicWord tasksStarted{0}; + AtomicWord tasksEnded{0}; + }; + Stats _stats; + + ServiceContext* const _svcCtx; mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ServiceExecutorFixed::_mutex"); stdx::condition_variable _shutdownCondition; + SharedPromise _shutdownComplete; /** - * State transition diagram: kNotStarted ---> kRunning ---> kStopped - * The service executor cannot be in "kRunning" when its destructor is invoked. + * State transition diagram: kNotStarted ---> kRunning ---> kStopping ---> kStopped */ - enum State { kNotStarted, kRunning, kStopped } _state = kNotStarted; + enum State { kNotStarted, kRunning, kStopping, kStopped } _state = kNotStarted; ThreadPool::Options _options; - std::unique_ptr _threadPool; + std::shared_ptr _threadPool; + + struct Waiter { + SessionHandle session; + OutOfLineExecutor::Task onCompletionCallback; + }; + using WaiterList = std::list; + WaiterList _waiters; - static inline thread_local std::unique_ptr _executorContext; + static thread_local std::unique_ptr _executorContext; }; } // namespace transport diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 4af83003930..be85fb1fb43 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -36,6 +36,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" +#include "mongo/db/service_context_test_fixture.h" #include "mongo/logv2/log.h" #include "mongo/transport/mock_session.h" #include "mongo/transport/service_executor_fixed.h" @@ -168,7 +169,7 @@ TEST_F(ServiceExecutorSynchronousFixture, ScheduleFailsBeforeStartup) { scheduleBasicTask(executor.get(), false); } -class ServiceExecutorFixedFixture : public unittest::Test { +class ServiceExecutorFixedFixture : public ServiceContextTest { public: static constexpr auto kNumExecutorThreads = 2; @@ -183,10 +184,9 @@ public: ServiceExecutorHandle(ServiceExecutorHandle&&) = delete; explicit ServiceExecutorHandle(int flags = kNone) : _skipShutdown(flags & kSkipShutdown) { - ThreadPool::Options options; - options.minThreads = options.maxThreads = kNumExecutorThreads; - options.poolName = "Test"; - _executor = std::make_shared(std::move(options)); + ThreadPool::Limits limits; + limits.minThreads = limits.maxThreads = kNumExecutorThreads; + _executor = std::make_shared(std::move(limits)); if (flags & kStartExecutor) { ASSERT_OK(_executor->start()); @@ -331,24 +331,42 @@ TEST_F(ServiceExecutorFixedFixture, Stats) { returnBarrier->countDownAndWait(); } -TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { + +TEST_F(ServiceExecutorFixedFixture, ScheduleSucceedsBeforeShutdown) { ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); - std::unique_ptr schedulerThread; + auto thread = stdx::thread(); + auto barrier = std::make_shared(2); { - // Spawn a thread to schedule a task, and block it before it can schedule the task with the - // underlying thread-pool. Then shutdown the service executor and unblock the scheduler - // thread. This order of events must cause "schedule()" to return a non-okay status. FailPointEnableBlock failpoint("hangBeforeSchedulingServiceExecutorFixedTask"); - schedulerThread = std::make_unique([executor = *executorHandle] { - ASSERT_NOT_OK( - executor->scheduleTask([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags)); + + + // The executor accepts the work, but hasn't used the underlying pool yet. + thread = stdx::thread([&] { + ASSERT_OK(executorHandle->scheduleTask([&, barrier] { barrier->countDownAndWait(); }, + ServiceExecutor::kEmptyFlags)); }); failpoint->waitForTimesEntered(1); - ASSERT_OK(executorHandle->shutdown(kShutdownTime)); + + // Trigger an immediate shutdown which will not affect the task we have accepted. + ASSERT_NOT_OK(executorHandle->shutdown(Milliseconds{0})); } - schedulerThread->join(); + // Our failpoint has been disabled, so the task can run to completion. + barrier->countDownAndWait(); + + // Now we can wait for the task to finish and shutdown. + ASSERT_OK(executorHandle->shutdown(kShutdownTime)); + + thread.join(); +} + +TEST_F(ServiceExecutorFixedFixture, ScheduleFailsAfterShutdown) { + ServiceExecutorHandle executorHandle(ServiceExecutorHandle::kStartExecutor); + + ASSERT_OK(executorHandle->shutdown(kShutdownTime)); + ASSERT_NOT_OK( + executorHandle->scheduleTask([] { MONGO_UNREACHABLE; }, ServiceExecutor::kEmptyFlags)); } TEST_F(ServiceExecutorFixedFixture, RunTaskAfterWaitingForData) { -- cgit v1.2.1