diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2022-11-29 22:54:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-29 23:51:17 +0000 |
commit | bdd6f97927ef265a26763f5986d185c75862650c (patch) | |
tree | 7d3adabe8d4bd5d853e40a02721f350212e0c990 | |
parent | 445e80440cff2475c51cde86efacc4d0dcd99676 (diff) | |
download | mongo-bdd6f97927ef265a26763f5986d185c75862650c.tar.gz |
SERVER-70151 no thread_locals in ServiceExecutorSynchronous
21 files changed, 1716 insertions, 808 deletions
diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp index d5dd31ca347..f5acac3951b 100644 --- a/src/mongo/db/commands/server_status_servers.cpp +++ b/src/mongo/db/commands/server_status_servers.cpp @@ -32,7 +32,6 @@ #include "mongo/transport/message_compressor_registry.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor_fixed.h" -#include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/util/net/hostname_canonicalization.h" #include "mongo/util/net/socket_utils.h" diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 2cbff7f7147..6cb51f3162e 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -112,9 +112,9 @@ tlEnv.Library( source=[ 'service_executor.cpp', 'service_executor_fixed.cpp', - 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', - 'service_executor_utils.cpp', + 'service_executor_synchronous_synchronization.cpp', + 'service_executor_synchronous_thread.cpp', 'service_executor.idl', ], LIBDEPS=[ diff --git a/src/mongo/transport/mock_service_executor.h b/src/mongo/transport/mock_service_executor.h index 4b4eaf752a1..0cf0affe84b 100644 --- a/src/mongo/transport/mock_service_executor.h +++ b/src/mongo/transport/mock_service_executor.h @@ -42,8 +42,8 @@ public: return startCb(); } - std::unique_ptr<TaskRunner> makeTaskRunner() override { - class Runner : public TaskRunner { + std::unique_ptr<Executor> makeTaskRunner() override { + class Runner : public Executor { public: explicit Runner(MockServiceExecutor* p) : _p{p} {} @@ -51,10 +51,13 @@ public: return _p->scheduleTaskCb(std::move(task)); } - void runOnDataAvailable(SessionHandle session, Task onCompletionCallback) override { + void runOnDataAvailable(const SessionHandle& session, + Task onCompletionCallback) override { _p->runOnDataAvailableCb(session, std::move(onCompletionCallback)); } + void yieldPointReached() const override {} + private: MockServiceExecutor* _p; }; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 0d98e57d7c9..aa04ce209b5 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -56,7 +56,6 @@ #include "mongo/transport/service_executor.h" #include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_gen.h" -#include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/session_workflow.h" @@ -447,7 +446,7 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { invariant(_svcCtx); appendInt("active", _svcCtx->getActiveClientOperations()); - const auto seStats = transport::ServiceExecutorStats::get(_svcCtx); + const auto seStats = transport::getServiceExecutorStats(_svcCtx); appendInt("threaded", seStats.usesDedicated); if (!serverGlobalParams.maxConnsOverride.empty()) appendInt("limitExempt", seStats.limitExempt); diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp index 49a6ef4f67c..b1a93ea27fc 100644 --- a/src/mongo/transport/service_executor.cpp +++ b/src/mongo/transport/service_executor.cpp @@ -40,9 +40,7 @@ #include "mongo/logv2/log.h" #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor_fixed.h" -#include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_executor_synchronous.h" -#include "mongo/util/processinfo.h" #include "mongo/util/synchronized_value.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork @@ -55,9 +53,9 @@ bool gInitialUseDedicatedThread = true; namespace { static constexpr auto kDiagnosticLogLevel = 4; -auto getServiceExecutorStats = +auto serviceExecutorStatsDecoration = ServiceContext::declareDecoration<synchronized_value<ServiceExecutorStats>>(); -auto getServiceExecutorContext = +auto serviceExecutorContextDecoration = Client::declareDecoration<std::unique_ptr<ServiceExecutorContext>>(); void incrThreadingModelStats(ServiceExecutorStats& stats, bool useDedicatedThread, int step) { @@ -65,26 +63,26 @@ void incrThreadingModelStats(ServiceExecutorStats& stats, bool useDedicatedThrea } } // namespace -ServiceExecutorStats ServiceExecutorStats::get(ServiceContext* ctx) noexcept { - return getServiceExecutorStats(ctx).get(); +ServiceExecutorStats getServiceExecutorStats(ServiceContext* ctx) { + return **serviceExecutorStatsDecoration(ctx); } ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept { // Service worker Clients will never have a ServiceExecutorContext. - return getServiceExecutorContext(client).get(); + return serviceExecutorContextDecoration(client).get(); } void ServiceExecutorContext::set(Client* client, std::unique_ptr<ServiceExecutorContext> seCtxPtr) noexcept { auto& seCtx = *seCtxPtr; - auto& serviceExecutorContext = getServiceExecutorContext(client); + auto& serviceExecutorContext = serviceExecutorContextDecoration(client); invariant(!serviceExecutorContext); seCtx._client = client; seCtx._sep = client->getServiceContext()->getServiceEntryPoint(); { - auto&& syncStats = *getServiceExecutorStats(client->getServiceContext()); + auto syncStats = *serviceExecutorStatsDecoration(client->getServiceContext()); if (seCtx._canUseReserved) ++syncStats->limitExempt; incrThreadingModelStats(*syncStats, seCtx._useDedicatedThread, 1); @@ -102,14 +100,14 @@ void ServiceExecutorContext::set(Client* client, void ServiceExecutorContext::reset(Client* client) noexcept { if (!client) return; - auto& seCtx = getServiceExecutorContext(client); + auto& seCtx = serviceExecutorContextDecoration(client); LOGV2_DEBUG(4898001, kDiagnosticLogLevel, "Resetting ServiceExecutor context for client", "client"_attr = client->desc(), "threadingModel"_attr = seCtx->_useDedicatedThread, "canUseReserved"_attr = seCtx->_canUseReserved); - auto stats = *getServiceExecutorStats(client->getServiceContext()); + auto stats = *serviceExecutorStatsDecoration(client->getServiceContext()); if (seCtx->_canUseReserved) --stats->limitExempt; incrThreadingModelStats(*stats, seCtx->_useDedicatedThread, -1); @@ -121,7 +119,7 @@ void ServiceExecutorContext::setUseDedicatedThread(bool b) noexcept { auto prev = std::exchange(_useDedicatedThread, b); if (!_client) return; - auto stats = *getServiceExecutorStats(_client->getServiceContext()); + auto stats = *serviceExecutorStatsDecoration(_client->getServiceContext()); incrThreadingModelStats(*stats, prev, -1); incrThreadingModelStats(*stats, _useDedicatedThread, +1); } @@ -134,7 +132,7 @@ void ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept { _canUseReserved = canUseReserved; if (_client) { - auto stats = getServiceExecutorStats(_client->getServiceContext()).synchronize(); + auto stats = *serviceExecutorStatsDecoration(_client->getServiceContext()); if (canUseReserved) { ++stats->limitExempt; } else { @@ -174,18 +172,6 @@ ServiceExecutor* ServiceExecutorContext::getServiceExecutor() noexcept { return transport::ServiceExecutorSynchronous::get(_client->getServiceContext()); } -void ServiceExecutor::yieldIfAppropriate() const { - /* - * In perf testing we found that yielding after running a each request produced - * at 5% performance boost in microbenchmarks if the number of worker threads - * was greater than the number of available cores. - */ - static const auto cores = ProcessInfo::getNumAvailableCores(); - if (getRunningThreads() > cores) { - stdx::this_thread::yield(); - } -} - void ServiceExecutor::shutdownAll(ServiceContext* serviceContext, Date_t deadline) { auto getTimeout = [&] { auto now = serviceContext->getPreciseClockSource()->now(); diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index d6b53453ba6..25884acca68 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -46,32 +46,32 @@ namespace mongo::transport { extern bool gInitialUseDedicatedThread; -/* - * This is the interface for all ServiceExecutors. - */ class ServiceExecutor { public: using Task = OutOfLineExecutor::Task; - class TaskRunner : public OutOfLineExecutor { + class Executor : public OutOfLineExecutor { public: /** * Awaits the availability of incoming data for the specified session. On success, it will * schedule the callback on current executor. Otherwise, it will invoke the callback with a * non-okay status on the caller thread. */ - virtual void runOnDataAvailable(std::shared_ptr<Session> session, Task task) = 0; - }; + virtual void runOnDataAvailable(const std::shared_ptr<Session>& session, Task task) = 0; + /** + * Yields if the associated ServiceExecutor's threads > number of cores. + * Yielding after each request gives +5% perf when threads outnumber + * cores. + */ + virtual void yieldPointReached() const = 0; + }; static void shutdownAll(ServiceContext* serviceContext, Date_t deadline); virtual ~ServiceExecutor() = default; - virtual std::unique_ptr<TaskRunner> makeTaskRunner() = 0; + virtual std::unique_ptr<Executor> makeTaskRunner() = 0; - /* - * Starts the ServiceExecutor. This may create threads even if no tasks are scheduled. - */ virtual Status start() = 0; /* @@ -86,14 +86,9 @@ public: /** Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. */ virtual void appendStats(BSONObjBuilder* bob) const = 0; - - /** Yield if this executor controls more threads than we have cores. */ - void yieldIfAppropriate() const; }; -/** - * ServiceExecutorContext determines which ServiceExecutor is used for each Client. - */ +/** Determines which ServiceExecutor is used for each Client. */ class ServiceExecutorContext { public: /** @@ -170,27 +165,13 @@ private: std::function<ServiceExecutor*()> _getServiceExecutorForTest; }; -/** - * A small statlet for tracking which executors may be in use. - */ -class ServiceExecutorStats { -public: - /** - * Get the current value of ServiceExecutorStats for the given ServiceContext. - * - * Note that this value is intended for statistics and logging. It is unsynchronized and - * unsuitable for informing decisions in runtime. - */ - static ServiceExecutorStats get(ServiceContext* ctx) noexcept; - - // The number of Clients who use the dedicated executors. - size_t usesDedicated = 0; - - // The number of Clients who use the borrowed executors. - size_t usesBorrowed = 0; - - // The number of Clients that are allowed to ignore maxConns and use reserved resources. - size_t limitExempt = 0; +struct ServiceExecutorStats { + size_t usesDedicated = 0; /**< Clients using the dedicated executors. */ + size_t usesBorrowed = 0; /**< Clients using the borrowed executors. */ + size_t limitExempt = 0; /**< Privileged Clients. */ }; +/** Snapshot of the stats attached to `ctx`. */ +ServiceExecutorStats getServiceExecutorStats(ServiceContext* ctx); + } // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_bm.cpp b/src/mongo/transport/service_executor_bm.cpp index b9e5b7e30dd..c0321f9e860 100644 --- a/src/mongo/transport/service_executor_bm.cpp +++ b/src/mongo/transport/service_executor_bm.cpp @@ -91,10 +91,6 @@ public: lastTearDown(); } - void runOnExec(ServiceExecutor::TaskRunner* taskRunner, ServiceExecutor::Task task) { - taskRunner->schedule(std::move(task)); - } - stdx::mutex mu; // NOLINT int nThreads = 0; ServiceContext* sc; @@ -105,8 +101,7 @@ auto maxThreads = 2 * ProcessInfo::getNumCores(); BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleTask)(benchmark::State& state) { for (auto _ : state) { - auto runner = executor()->makeTaskRunner(); - runOnExec(&*runner, [](Status) {}); + executor()->makeTaskRunner()->schedule([](Status) {}); } } BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleTask)->ThreadRange(1, maxThreads); @@ -116,7 +111,7 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleAndWait)(benchmark::Sta for (auto _ : state) { auto runner = executor()->makeTaskRunner(); Notification done; - runOnExec(&*runner, [&](Status) { done.set(); }); + runner->schedule([&](Status) { done.set(); }); done.get(); } } @@ -125,7 +120,7 @@ BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleAndWait)->ThreadRange BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::State& state) { int chainDepth = state.range(0); struct LoopState { - std::shared_ptr<ServiceExecutor::TaskRunner> runner; + std::shared_ptr<ServiceExecutor::Executor> runner; Notification done; unittest::Barrier startingLine{2}; }; @@ -133,7 +128,7 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta std::function<void(Status)> chainedTask = [&](Status) { loopStatePtr->done.set(); }; for (int step = 0; step != chainDepth; ++step) chainedTask = [this, chainedTask, &loopStatePtr](Status) { - runOnExec(&*loopStatePtr->runner, chainedTask); + loopStatePtr->runner->schedule(chainedTask); }; // The first scheduled task starts the worker thread. This test is @@ -149,9 +144,9 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta {}, }; loopStatePtr = &loopState; - runOnExec(&*loopStatePtr->runner, [&](Status s) { + loopStatePtr->runner->schedule([&](Status s) { loopState.startingLine.countDownAndWait(); - runOnExec(&*loopStatePtr->runner, chainedTask); + loopStatePtr->runner->schedule(chainedTask); }); state.ResumeTiming(); loopState.startingLine.countDownAndWait(); @@ -159,7 +154,7 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta } } BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ChainedSchedule) - ->Range(1, 2 << 10) + ->Range(1, 1 << 8) ->ThreadRange(1, maxThreads); } // namespace diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index c0b3a5cad18..86e1456832e 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -37,6 +37,7 @@ #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" +#include "mongo/util/processinfo.h" #include "mongo/util/testing_proctor.h" #include "mongo/util/thread_safety_context.h" @@ -332,7 +333,7 @@ void ServiceExecutorFixed::_beginShutdown() { _checkForShutdown(); break; case State::kStopping: - break; // Just nead to wait it out. + break; // Just need to wait it out. case State::kStopped: break; } @@ -406,8 +407,6 @@ size_t ServiceExecutorFixed::getRunningThreads() const { void ServiceExecutorFixed::_runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback) { invariant(session); - yieldIfAppropriate(); - // Make sure we're still allowed to schedule and track the session auto lk = stdx::unique_lock(_mutex); if (_state != State::kRunning) { @@ -452,11 +451,17 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const { return _executorContext->getRecursionDepth(); } -auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<TaskRunner> { +void ServiceExecutorFixed::_yield() const { + static const auto cores = ProcessInfo::getNumAvailableCores(); + if (getRunningThreads() > cores) + stdx::this_thread::yield(); +} + +auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<Executor> { iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _state == State::kRunning); /** Schedules on this. */ - class ForwardingTaskRunner : public TaskRunner { + class ForwardingTaskRunner : public Executor { public: explicit ForwardingTaskRunner(ServiceExecutorFixed* e) : _e{e} {} @@ -464,8 +469,12 @@ auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<TaskRunner> { _e->_schedule(std::move(task)); } - void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override { - _e->_runOnDataAvailable(std::move(session), std::move(task)); + void runOnDataAvailable(const std::shared_ptr<Session>& session, Task task) override { + _e->_runOnDataAvailable(session, std::move(task)); + } + + void yieldPointReached() const override { + _e->_yield(); } private: diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index a74adc6f3b7..0eb670fc503 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -78,7 +78,7 @@ public: */ int getRecursionDepthForExecutorThread() const; - std::unique_ptr<TaskRunner> makeTaskRunner() override; + std::unique_ptr<Executor> makeTaskRunner() override; private: enum class State { kNotStarted, kRunning, kStopping, kStopped }; @@ -110,6 +110,8 @@ private: /** Requires `_mutex` locked by `lk`. */ bool _waitForStop(stdx::unique_lock<Mutex>& lk, boost::optional<Milliseconds> timeout); + void _yield() const; + /** `_state` transitions: kNotStarted -> kRunning -> kStopping -> kStopped */ State _state = State::kNotStarted; diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp deleted file mode 100644 index 1ec4e7af75e..00000000000 --- a/src/mongo/transport/service_executor_reserved.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor_reserved.h" - -#include "mongo/db/server_options.h" -#include "mongo/logv2/log.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_executor_utils.h" -#include "mongo/util/processinfo.h" -#include "mongo/util/thread_safety_context.h" - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor - - -namespace mongo { -namespace transport { -namespace { - -constexpr auto kExecutorName = "reserved"_sd; - -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kClientsInTotal = "clientsInTotal"_sd; -constexpr auto kClientsRunning = "clientsRunning"_sd; -constexpr auto kClientsWaiting = "clientsWaitingForData"_sd; - -const auto getServiceExecutorReserved = - ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>(); - -const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorReserved", [](ServiceContext* ctx) { - if (!serverGlobalParams.reservedAdminThreads) { - return; - } - - getServiceExecutorReserved(ctx) = std::make_unique<transport::ServiceExecutorReserved>( - ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); - }}; -} // namespace - -thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {}; -thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0; - -ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx, - std::string name, - size_t reservedThreads) - : _name(std::move(name)), _reservedThreads(reservedThreads) {} - -Status ServiceExecutorReserved::start() { - { - stdx::unique_lock<Latch> lk(_mutex); - _stillRunning.store(true); - _numStartingThreads = _reservedThreads; - } - - for (size_t i = 0; i < _reservedThreads; i++) { - auto status = _startWorker(); - if (!status.isOK()) { - return status; - } - } - - return Status::OK(); -} - -Status ServiceExecutorReserved::_startWorker() { - LOGV2(22978, - "Starting new worker thread for {name} service executor", - "Starting new worker thread for service executor", - "name"_attr = _name); - return launchServiceWorkerThread([this] { - stdx::unique_lock<Latch> lk(_mutex); - _numRunningWorkerThreads.addAndFetch(1); - ScopeGuard numRunningGuard([&] { - _numRunningWorkerThreads.subtractAndFetch(1); - _shutdownCondition.notify_one(); - }); - - _numStartingThreads--; - _numReadyThreads++; - - while (_stillRunning.load()) { - _threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); }); - - if (!_stillRunning.loadRelaxed()) { - break; - } - - if (_readyTasks.empty()) { - continue; - } - - auto task = std::move(_readyTasks.front()); - _readyTasks.pop_front(); - _numReadyThreads -= 1; - bool launchReplacement = false; - if (_numReadyThreads + _numStartingThreads < _reservedThreads) { - _numStartingThreads++; - launchReplacement = true; - } - - lk.unlock(); - - if (launchReplacement) { - auto threadStartStatus = _startWorker(); - if (!threadStartStatus.isOK()) { - LOGV2_WARNING(22981, - "Could not start new reserve worker thread: {error}", - "Could not start new reserve worker thread", - "error"_attr = threadStartStatus); - } - } - - _localWorkQueue.emplace_back(std::move(task)); - while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { - _localWorkQueue.front()(Status::OK()); - _localWorkQueue.pop_front(); - } - - lk.lock(); - if (_numReadyThreads + 1 > _reservedThreads) { - break; - } else { - _numReadyThreads += 1; - } - } - - LOGV2_DEBUG(22979, - 3, - "Exiting worker thread in {name} service executor", - "Exiting worker thread in service executor", - "name"_attr = _name); - }); -} - -ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorReserved(ctx); - - // The ServiceExecutorReserved could be absent, so nullptr is okay. - return ref.get(); -} - -Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { - LOGV2_DEBUG(22980, 3, "Shutting down reserved executor"); - - stdx::unique_lock<Latch> lock(_mutex); - _stillRunning.store(false); - _threadWakeup.notify_all(); - - bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() { - return _numRunningWorkerThreads.load() == 0; - }); - - return result - ? Status::OK() - : Status(ErrorCodes::Error::ExceededTimeLimit, - "reserved executor couldn't shutdown all worker threads within time limit."); -} - -void ServiceExecutorReserved::_schedule(Task task) { - if (!_stillRunning.load()) { - task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); - return; - } - - if (!_localWorkQueue.empty()) { - _localWorkQueue.emplace_back(std::move(task)); - return; - } - - stdx::lock_guard<Latch> lk(_mutex); - _readyTasks.push_back(std::move(task)); - _threadWakeup.notify_one(); -} - -void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { - // The ServiceExecutorReserved loans a thread to one client for its lifetime and waits - // synchronously on thread. - struct Statlet { - int threads; - int total; - int running; - int waiting; - }; - - auto statlet = [&] { - stdx::lock_guard lk(_mutex); - auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); - auto total = static_cast<int>(threads - _numReadyThreads - _numStartingThreads); - auto running = total; - auto waiting = 0; - return Statlet{threads, total, running, waiting}; - }(); - - BSONObjBuilder subbob = bob->subobjStart(kExecutorName); - subbob.append(kThreadsRunning, statlet.threads); - subbob.append(kClientsInTotal, statlet.total); - subbob.append(kClientsRunning, statlet.running); - subbob.append(kClientsWaiting, statlet.waiting); -} - -/** - * Schedules task immediately, on the assumption that The task will block to - * receive the next message and we don't mind blocking on this dedicated - * worker thread. - */ -void ServiceExecutorReserved::_runOnDataAvailable(const SessionHandle& session, Task task) { - invariant(session); - _schedule([this, session, callback = std::move(task)](Status status) { - yieldIfAppropriate(); - if (!status.isOK()) { - callback(std::move(status)); - return; - } - callback(session->waitForData()); - }); -} - -auto ServiceExecutorReserved::makeTaskRunner() -> std::unique_ptr<TaskRunner> { - iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _stillRunning.load()); - - /** Schedules on this. */ - class ForwardingTaskRunner : public TaskRunner { - public: - explicit ForwardingTaskRunner(ServiceExecutorReserved* e) : _e{e} {} - - void schedule(Task task) override { - _e->_schedule(std::move(task)); - } - - void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override { - _e->_runOnDataAvailable(std::move(session), std::move(task)); - } - - private: - ServiceExecutorReserved* _e; - }; - return std::make_unique<ForwardingTaskRunner>(this); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h deleted file mode 100644 index b9211b2f58b..00000000000 --- a/src/mongo/transport/service_executor_reserved.h +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <deque> - -#include "mongo/base/status.h" -#include "mongo/db/service_context.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/platform/mutex.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/transport/service_executor.h" - -namespace mongo { -namespace transport { - -/** - * The reserved service executor emulates a thread per connection. - * Each connection has its own worker thread where jobs get scheduled. - * - * The executor will start reservedThreads on start, and create a new thread every time it - * starts a new thread, ensuring there are always reservedThreads available for work - this - * means that even when you hit the NPROC ulimit, there will still be threads ready to - * accept work. When threads exit, they will go back to waiting for work if there are fewer - * than reservedThreads available. - */ -class ServiceExecutorReserved final : public ServiceExecutor { -public: - explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads); - - static ServiceExecutorReserved* get(ServiceContext* ctx); - - Status start() override; - Status shutdown(Milliseconds timeout) override; - - size_t getRunningThreads() const override { - return _numRunningWorkerThreads.loadRelaxed(); - } - - void appendStats(BSONObjBuilder* bob) const override; - - std::unique_ptr<TaskRunner> makeTaskRunner() override; - -private: - Status _startWorker(); - - void _schedule(Task task); - - void _runOnDataAvailable(const SessionHandle& session, Task task); - - static thread_local std::deque<Task> _localWorkQueue; - static thread_local int64_t _localThreadIdleCounter; - - AtomicWord<bool> _stillRunning{false}; - - mutable Mutex _mutex = MONGO_MAKE_LATCH("ServiceExecutorReserved::_mutex"); - stdx::condition_variable _threadWakeup; - stdx::condition_variable _shutdownCondition; - - std::deque<Task> _readyTasks; - - AtomicWord<unsigned> _numRunningWorkerThreads{0}; - size_t _numReadyThreads{0}; - size_t _numStartingThreads{0}; - - const std::string _name; - const size_t _reservedThreads; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index 31b88b76eb6..6748c3aa229 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -29,195 +29,744 @@ #include "mongo/transport/service_executor_synchronous.h" +#include <array> +#include <deque> +#include <fmt/format.h> + +#include "mongo/base/status.h" +#include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/thread.h" -#include "mongo/transport/service_executor_utils.h" +#include "mongo/transport/service_executor.h" +#include "mongo/transport/service_executor_synchronous_synchronization.h" +#include "mongo/transport/service_executor_synchronous_thread.h" +#include "mongo/transport/session.h" +#include "mongo/util/duration.h" +#include "mongo/util/functional.h" +#include "mongo/util/processinfo.h" +#include "mongo/util/scoped_unlock.h" +#include "mongo/util/stacktrace.h" +#include "mongo/util/thread_safety_context.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor namespace mongo::transport { namespace { -const auto getServiceExecutorSynchronous = - ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>(); -const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{ - "ServiceExecutorSynchronous", [](ServiceContext* ctx) { - getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx); - }}; -} // namespace +using namespace fmt::literals; -class ServiceExecutorSynchronous::SharedState : public std::enable_shared_from_this<SharedState> { -private: - class LockRef { +// Central switch for debug instrumentation. +constexpr int kLocalDbg = 5; + +Status notRunningStatus() { + return {ErrorCodes::ShutdownInProgress, "Executor is not running"}; +} + +size_t cores() { + static const auto numCores = ProcessInfo::getNumAvailableCores(); + return numCores; +} + +enum class BucketId { + none, + starting, + ready, + leased, + stopping, +}; + +StringData bucketName(BucketId b) { + static constexpr std::array names{ + "none"_sd, + "starting"_sd, + "ready"_sd, + "leased"_sd, + "stopping"_sd, + }; + return names[static_cast<size_t>(b)]; +} + +std::ostream& toString(std::ostream& os, BucketId b) { + return os << bucketName(b); +} + +class ManagedBuckets { +public: + class Member; + using Bucket = std::list<std::shared_ptr<Member>>; + + /** Intrusive base class for objects that want to be in a `ManagedBucket`. */ + class Member { public: - explicit LockRef(SharedState* p) : _p{p} {} + virtual ~Member() { + invariant(!bucket, "Destroyed while held by a bucket"); + } - size_t threads() const { - return _p->_threads; + private: + friend class ManagedBuckets; + Bucket* bucket = nullptr; + typename Bucket::iterator iterator; + }; + + struct BucketSizes { + size_t starting; + size_t ready; + size_t leased; + size_t stopping; + }; + + class SyncAccess { + public: + explicit SyncAccess(const ManagedBuckets* sourcePtr) : _sourcePtr{sourcePtr} {} + + BucketSizes bucketSizes() const { + return {_src()._starting.size(), + _src()._ready.size(), + _src()._leased.size(), + _src()._stopping.size()}; } - bool waitForDrain(Milliseconds dur) { - return _p->_cv.wait_for(_lk, dur.toSystemDuration(), [&] { return !_p->_threads; }); + size_t totalSize() const { + auto bs = bucketSizes(); + return bs.starting + bs.ready + bs.leased + bs.stopping; } - void onStartThread() { - ++_p->_threads; + /** Move `w` from whatever bucket it's in now (if any) to the `toId` bucket. */ + void move(const std::shared_ptr<Member>& w, BucketId toId) { + _moveLocked(w, toId); } - void onEndThread() { - if (!--_p->_threads) - _p->_cv.notify_all(); + /** Move the back of `fromId` and into `toId`. */ + std::shared_ptr<Member> popMove(BucketId fromId, BucketId toId) { + auto from = _src()._bucket(fromId); + if (from->empty()) + return nullptr; + auto w = from->back(); + _moveLocked(w, toId); + return w; } private: - SharedState* _p; - stdx::unique_lock<stdx::mutex> _lk{_p->_mutex}; // NOLINT - }; + void _moveLocked(const std::shared_ptr<Member>& w, BucketId destId) { + Bucket* destPtr = _src()._bucket(destId); + Member& bm = *w; + if (destPtr) { + if (bm.bucket) { + destPtr->splice(destPtr->end(), *bm.bucket, bm.iterator); + } else { + destPtr->push_back(w); + bm.iterator = std::prev(destPtr->end()); + } + } else { + if (bm.bucket) + bm.bucket->erase(bm.iterator); + } + bm.bucket = destPtr; + } -public: - void schedule(Task task); + const ManagedBuckets& _src() const { + return *_sourcePtr; + } - bool isRunning() const { - return _isRunning.loadRelaxed(); - } + ManagedBuckets& _src() { + return *const_cast<ManagedBuckets*>(_sourcePtr); + } + + const ManagedBuckets* _sourcePtr; + Synchronization::Lock _lk{_sourcePtr->_sync.acquireLock()}; + }; - void setIsRunning(bool b) { - _isRunning.store(b); + explicit ManagedBuckets(std::shared_ptr<SyncDomain> domain) + : _sync{"buckets", std::move(domain)} {} + + std::string toString() const { + auto bs = sync().bucketSizes(); + return "[starting={}/ready={}/leased={}/stopping={}]"_format( + bs.starting, bs.ready, bs.leased, bs.stopping); } - LockRef lock() { - return LockRef{this}; + SyncAccess sync() const { + return SyncAccess{this}; } private: - class WorkerThreadInfo; + Bucket* _bucket(BucketId id) { + switch (id) { + case BucketId::none: + return nullptr; + case BucketId::starting: + return &_starting; + case BucketId::ready: + return &_ready; + case BucketId::leased: + return &_leased; + case BucketId::stopping: + return &_stopping; + } + MONGO_UNREACHABLE; + } - mutable stdx::mutex _mutex; // NOLINT - stdx::condition_variable _cv; - AtomicWord<bool> _isRunning; - size_t _threads = 0; + mutable Synchronization _sync; + Bucket _starting; + Bucket _ready; + Bucket _leased; + Bucket _stopping; }; -class ServiceExecutorSynchronous::SharedState::WorkerThreadInfo { +class Worker : public std::enable_shared_from_this<Worker>, public ManagedBuckets::Member { public: - explicit WorkerThreadInfo(std::shared_ptr<SharedState> sharedState) - : sharedState{std::move(sharedState)} {} + class LeaseToken; + + class Env { + public: + virtual ~Env() = default; + + virtual StringData name() const = 0; + + virtual bool isRunning() const = 0; + + virtual size_t getRunningThreads() const = 0; + + /** May throw ShutdownInProgress. */ + virtual void onReadyThread(const std::shared_ptr<Worker>& w) = 0; + + /** May throw ShutdownInProgress. */ + virtual void onReleasedThread(const std::shared_ptr<Worker>& w) = 0; + + virtual void onEndThread(const std::shared_ptr<Worker>& w) = 0; + + virtual void yield() const = 0; + + /** + * The worker's Synchronization object will participate in an + * env-specified SyncDomain if there is one, or it can be nullptr. + */ + virtual std::shared_ptr<SyncDomain> syncDomain() = 0; + }; + + Worker(std::unique_ptr<Env> env, uint64_t id) + : _env{std::move(env)}, _id{id}, _sync{"worker{}"_format(_id), _env->syncDomain()} { + LOGV2_DEBUG(7015101, kLocalDbg, "Worker ctor", "w"_attr = _id); + } + + ~Worker() { + LOGV2_DEBUG(7015102, kLocalDbg, "Worker dtor", "w"_attr = _id); + } + + /** + * The worker's executor. It pins a reference to a Worker thread on which it + * can schedule tasks. There is one per Worker, and when it dies, it moves its + * worker thread into the executor's ready list. + * From there, the executor can destroy it or lease it out again as needed. + */ + std::unique_ptr<LeaseToken> makeLease(); + + void shutdown() { + auto lk = _sync.acquireLock(); + shutdownLocked(); + } + + void shutdownLocked() { + _state = State::stopped; + _sync.notifyAll(); + } void run() { - while (!queue.empty() && sharedState->isRunning()) { - queue.front()(Status::OK()); - queue.pop_front(); + ScopeGuard epilogue = [&] { _env->onEndThread(shared_from_this()); }; + try { + _env->onReadyThread(shared_from_this()); + } catch (const ExceptionFor<ErrorCodes::ShutdownInProgress>&) { + shutdown(); } + auto lk = _sync.acquireLock(); + while (_state != State::stopped) { + _sync.wait(lk, [&] { return _state != State::idle; }); + if (_state == State::leased) { + _serveLease(lk); + _endLeaseLocked(); + } + } + _serveLease(lk, notRunningStatus()); + } + + uint64_t getId() const { + return _id; } - std::shared_ptr<SharedState> sharedState; - std::deque<Task> queue; +private: + enum class State { + idle, + leased, + stopped, + }; + + friend StringData toString(State s) { + return std::array{ + "idle"_sd, + "leased"_sd, + "stopped"_sd, + }[static_cast<int>(s)]; + } + + void _endLeaseLocked() { + if (_state != State::leased) + return; + _state = State::idle; + try { + _env->onReleasedThread(shared_from_this()); + } catch (const ExceptionFor<ErrorCodes::ShutdownInProgress>&) { + _state = State::stopped; + } + _sync.notifyAll(); + } + + void _onDestroyLeaseToken() { + auto lk = _sync.acquireLock(); + _endLeaseLocked(); + } + + void _schedule(ServiceExecutor::Task task) { + LOGV2_DEBUG(7015106, kLocalDbg, "Schedule", "w"_attr = getId()); + if (MONGO_unlikely(!_env->isRunning())) { + task(notRunningStatus()); + return; + } + auto lk = _sync.acquireLock(); + _tasks.push_back(std::move(task)); + _sync.notifyOne(); + } + + /** + * Schedules task immediately, on the assumption that The task will block to + * receive the next message and we don't mind blocking on this dedicated + * worker thread. + */ + void _runOnDataAvailable(const std::shared_ptr<Session>& session, ServiceExecutor::Task task) { + invariant(session); + _schedule(std::move(task)); + } + + /** + * Runs for the duration of a worker lease, calling queued tasks with the + * specified `execStatus`, watching for state changes. If this worker + * becomes un-leased, any queued tasks are still invoked, and the function + * returns. + */ + void _serveLease(Synchronization::Lock& lk, Status execStatus = Status::OK()) { + while (true) { + _sync.wait(lk, [&] { return _state != State::leased || !_tasks.empty(); }); + if (_tasks.empty()) + return; + auto t = std::move(_tasks.front()); + _tasks.pop_front(); + ScopedUnlock unlock{lk}; + LOGV2_DEBUG( + 7015100, kLocalDbg, "Run task", "w"_attr = getId(), "executor"_attr = _env->name()); + t(execStatus); + t = nullptr; + } + } + + void _yield() const { + _env->yield(); + } + + const std::unique_ptr<Env> _env; + const uint64_t _id; + Synchronization _sync; + std::deque<ServiceExecutor::Task> _tasks; + State _state = State::idle; }; -void ServiceExecutorSynchronous::SharedState::schedule(Task task) { - if (!isRunning()) { - task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); - return; +class Worker::LeaseToken : public ServiceExecutor::Executor { +public: + explicit LeaseToken(std::shared_ptr<Worker> worker) : _w{std::move(worker)} { + LOGV2_DEBUG(7015107, kLocalDbg, "LeaseToken ctor", "w"_attr = _w->getId()); } - thread_local WorkerThreadInfo* workerThreadInfoTls = nullptr; + ~LeaseToken() { + LOGV2_DEBUG(7015108, kLocalDbg, "LeaseToken dtor", "w"_attr = _w->getId()); + _w->_onDestroyLeaseToken(); + } - if (workerThreadInfoTls) { - workerThreadInfoTls->queue.push_back(std::move(task)); - return; + void schedule(Task task) override { + _w->_schedule(std::move(task)); } - LOGV2_DEBUG(22983, 3, "Starting ServiceExecutorSynchronous worker thread"); - auto workerInfo = std::make_unique<WorkerThreadInfo>(shared_from_this()); - workerInfo->queue.push_back(std::move(task)); + void runOnDataAvailable(const std::shared_ptr<Session>& session, Task task) override { + _w->_runOnDataAvailable(session, std::move(task)); + } - Status status = launchServiceWorkerThread([w = std::move(workerInfo)] { - w->sharedState->lock().onStartThread(); - ScopeGuard onEndThreadGuard = [&] { w->sharedState->lock().onEndThread(); }; + void yieldPointReached() const override { + _w->_yield(); + } - workerThreadInfoTls = &*w; - ScopeGuard resetTlsGuard = [&] { workerThreadInfoTls = nullptr; }; +private: + std::shared_ptr<Worker> _w; +}; - w->run(); - }); - // The usual way to fail to schedule is to invoke the task, but in this case - // we don't have the task anymore. We gave it away to the callback that the - // failed thread was supposed to run. - iassert(status); +std::unique_ptr<Worker::LeaseToken> Worker::makeLease() { + auto lk = _sync.acquireLock(); + if (_state != State::idle) + return nullptr; + _state = State::leased; + return std::make_unique<LeaseToken>(shared_from_this()); } -ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext*) - : _sharedState{std::make_shared<SharedState>()} {} +class DedicatedThreadExecutorPool + : public std::enable_shared_from_this<DedicatedThreadExecutorPool> { +private: + size_t _availableThreads(const ManagedBuckets::BucketSizes& bs) const { + return bs.starting + bs.ready; + } -ServiceExecutorSynchronous::~ServiceExecutorSynchronous() = default; + /** If we have less than reserve, we need more. */ + bool _needsMoreWorkers(Synchronization::Lock& lk, const ManagedBuckets::BucketSizes& bs) const { + return _availableThreads(bs) < _waitingForLease + _reservedThreads; + } -Status ServiceExecutorSynchronous::start() { - _sharedState->setIsRunning(true); - return Status::OK(); -} + /** We keep a few threads alive to reduce spawning. */ + bool _needsFewerWorkers(Synchronization::Lock& lk, + const ManagedBuckets::BucketSizes& bs) const { + return _availableThreads(bs) >= _waitingForLease + _reservedThreads + _maxIdleThreads; + } -Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { - LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor"); - auto stopLock = _sharedState->lock(); - _sharedState->setIsRunning(false); - if (!stopLock.waitForDrain(timeout)) - return Status(ErrorCodes::Error::ExceededTimeLimit, - "passthrough executor couldn't shutdown " - "all worker threads within time limit."); - return Status::OK(); -} + /** Called when a new worker has started, becoming ready. */ + void _onReadyThread(const std::shared_ptr<Worker>& w) { + LOGV2_DEBUG(7015109, kLocalDbg, "A new Worker has become ready", "w"_attr = w->getId()); + auto lk = _sync.acquireLock(); + if (!_isRunning.loadRelaxed()) + iasserted(notRunningStatus()); + _workers.sync().move(_asMember(w), BucketId::ready); + _sync.notifyAll(); + } -ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorSynchronous(ctx); - invariant(ref); - return ref.get(); + /** + * Worker `w` has been released from a lease. + * A decision is made here about whether to keep the worker or discard it. + */ + void _onReleasedThread(const std::shared_ptr<Worker>& w) { + LOGV2_DEBUG(7015110, kLocalDbg, "Worker released from lease", "w"_attr = w->getId()); + auto lk = _sync.acquireLock(); + if (!_isRunning.loadRelaxed()) + iasserted(notRunningStatus()); + auto workersSync = _workers.sync(); + auto bs = workersSync.bucketSizes(); + if (_needsFewerWorkers(lk, bs)) { + w->shutdownLocked(); + workersSync.move(w, BucketId::none); + _updateLazyShouldYield(workersSync.bucketSizes()); + } else { + workersSync.move(w, BucketId::ready); + } + _sync.notifyAll(); + } + + /** Remove a dying worker from the buckets as its run() concludes. */ + void _onEndThread(const std::shared_ptr<Worker>& w) { + LOGV2_DEBUG(7015111, kLocalDbg, "Worker end", "w"_attr = w->getId()); + auto lk = _sync.acquireLock(); + _workers.sync().move(w, BucketId::none); + _sync.notifyAll(); + } + +public: + explicit DedicatedThreadExecutorPool(std::string name, + size_t reservedThreads, + size_t maxIdleThreads, + std::string statLabel) + : _name{std::move(name)}, + _reservedThreads{reservedThreads}, + _maxIdleThreads{maxIdleThreads}, + _statLabel{std::move(statLabel)}, + _workers{_sync.domain()} {} + + Status start() try { + auto lk = _sync.acquireLock(); + _isRunning.store(true); + _sync.notifyAll(); + _spawnEnoughWorkers(lk); + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } + + size_t getRunningThreads() const { + return _workers.sync().totalSize(); + } + + void appendStats(BSONObjBuilder* bob) const { + auto bs = _workers.sync().bucketSizes(); + int threads = bs.starting + bs.ready + bs.leased + bs.stopping; + int clients = bs.leased; + BSONObjBuilder{bob->subobjStart(_statLabel)} + .append("threadsRunning", threads) + .append("clientsInTotal", clients) + .append("clientsRunning", clients) + .append("clientsWaitingForData", 0); + } + + /** Lease a worker, spawning new workers as necessary. */ + std::unique_ptr<ServiceExecutor::Executor> makeTaskRunner() { + LOGV2_DEBUG(7015112, kLocalDbg, "makeTaskRunner"); + auto lk = _sync.acquireLock(); + ++_waitingForLease; + ScopeGuard restoreWaitingForLease = [&] { --_waitingForLease; }; + while (MONGO_likely(_isRunning.loadRelaxed())) { + try { + _spawnEnoughWorkers(lk); + } catch (const DBException& ex) { + LOGV2_DEBUG(7015103, 3, "Nonfatal spawn failure", "error"_attr = ex); + } + + auto [w, bs] = _nextWorker(lk); + if (!w) + continue; + _updateLazyShouldYield(bs); + + // `w->makeLease` locks `w`, so release subordinate mutex. + ScopedUnlock unlockThis{lk}; + if (auto lease = w->makeLease()) + return lease; + // Retry. Lease failed as w is no longer idle. + // Another requester must have gotten it first. + } + iasserted(notRunningStatus()); + } + + bool isRunning() const { + return _isRunning.loadRelaxed(); + } + + Status shutdown(Milliseconds timeout) { + LOGV2_DEBUG(22982, 3, "Shutting down executor", "name"_attr = _name); + auto lk = _sync.acquireLock(); + _isRunning.store(false); + { + ScopedUnlock unlock{lk}; + for (auto b : {BucketId::starting, BucketId::ready, BucketId::leased}) + while (auto w = _workers.sync().popMove(b, BucketId::stopping)) + _asWorker(w)->shutdown(); + } + _sync.notifyAll(); + if (!_sync.waitFor(lk, timeout, [&] { + LOGV2_DEBUG(7015113, kLocalDbg, "waitForDrain"); + return _workers.sync().totalSize() == 0; + })) { + return Status( + ErrorCodes::Error::ExceededTimeLimit, + "Executor {} couldn't shutdown within {}."_format(_name, timeout.toString())); + } + return Status::OK(); + } + +private: + static std::shared_ptr<ManagedBuckets::Member> _asMember(std::shared_ptr<Worker> w) { + return std::dynamic_pointer_cast<ManagedBuckets::Member>(std::move(w)); + } + static std::shared_ptr<Worker> _asWorker(std::shared_ptr<ManagedBuckets::Member> w) { + return std::dynamic_pointer_cast<Worker>(std::move(w)); + } + + /** The environment that `_makeWorker` imbues into its products. */ + class WorkerEnv : public Worker::Env { + public: + explicit WorkerEnv(std::shared_ptr<DedicatedThreadExecutorPool> source) + : _source{std::move(source)} {} + StringData name() const override { + return _source->_name; + } + bool isRunning() const override { + return _source->isRunning(); + } + size_t getRunningThreads() const override { + return _source->getRunningThreads(); + } + void onReadyThread(const std::shared_ptr<Worker>& w) override { + _source->_onReadyThread(w); + } + void onReleasedThread(const std::shared_ptr<Worker>& w) override { + _source->_onReleasedThread(w); + } + void onEndThread(const std::shared_ptr<Worker>& w) override { + _source->_onEndThread(w); + } + std::shared_ptr<SyncDomain> syncDomain() override { + return _source->_sync.domain(); + } + void yield() const override { + return _source->_yield(); + } + + private: + std::shared_ptr<DedicatedThreadExecutorPool> _source; + }; + + /** + * Only the leased workers are competing for cores. + * This metric has never accounted for other threads competing with this + * executor's threads. Even among the ServiceExecutors, Synchronous and + * Reserved can be simultanenously active. + */ + void _updateLazyShouldYield(ManagedBuckets::BucketSizes bs) { + bool newValue = bs.leased > cores(); + if (_lazyShouldYield.loadRelaxed() != newValue) + _lazyShouldYield.store(newValue); + } + + void _yield() const { + if (_lazyShouldYield.loadRelaxed()) + stdx::this_thread::yield(); + } + + void _spawnEnoughWorkers(Synchronization::Lock& lk) { + while (_needsMoreWorkers(lk, _workers.sync().bucketSizes())) { + LOGV2_DEBUG(7015114, + 3, + "Starting a new thread", + "executor"_attr = _name, + "waitingForLease"_attr = _waitingForLease, + "buckets"_attr = _workers.toString()); + ScopedUnlock unlock{lk}; + auto w = _makeWorker(); + _workers.sync().move(w, BucketId::starting); + try { + iassert(launchServiceWorkerThread([w]() mutable { std::move(w)->run(); })); + } catch (...) { + _workers.sync().move(w, BucketId::none); + throw; + } + } + } + + std::tuple<std::shared_ptr<Worker>, ManagedBuckets::BucketSizes> _nextWorker( + Synchronization::Lock& lk) { + for (;;) { + _sync.wait(lk, [&] { + auto bs = _workers.sync().bucketSizes(); + return !_isRunning.loadRelaxed() || bs.ready || !bs.starting; + }); + if (!_isRunning.loadRelaxed()) + iasserted(notRunningStatus()); + auto workersSync = _workers.sync(); + auto bs = workersSync.bucketSizes(); + if (bs.ready) + return {_asWorker(workersSync.popMove(BucketId::ready, BucketId::leased)), bs}; + if (!bs.starting) + iasserted({ErrorCodes::InternalError, "No ready workers"}); + } + } + + std::shared_ptr<Worker> _makeWorker() { + return std::make_shared<Worker>(std::make_unique<WorkerEnv>(shared_from_this()), + _workerIdCounter.fetchAndAdd(1)); + } + + const std::string _name; + const size_t _reservedThreads; + const size_t _maxIdleThreads; + const std::string _statLabel; + AtomicWord<uint64_t> _workerIdCounter{1}; + mutable Synchronization _sync{"executor"}; ///< Root of its domain + AtomicWord<bool> _isRunning; + size_t _waitingForLease = 0; + ManagedBuckets _workers; + + /** Updated in a relaxed manner when a Worker lease is created or destroyed. */ + AtomicWord<bool> _lazyShouldYield; +}; + +} // namespace + +// ======================= +// ServiceExecutorSyncBase +// ======================= + +class ServiceExecutorSyncBase::Impl : public DedicatedThreadExecutorPool { +public: + using DedicatedThreadExecutorPool::DedicatedThreadExecutorPool; +}; + +ServiceExecutorSyncBase::ServiceExecutorSyncBase(std::string name, + size_t reservedThreads, + size_t maxIdleThreads, + std::string statLabel) + : _impl{std::make_shared<Impl>( + std::move(name), reservedThreads, maxIdleThreads, std::move(statLabel))} {} + +ServiceExecutorSyncBase::~ServiceExecutorSyncBase() = default; + +Status ServiceExecutorSyncBase::start() { + return _impl->start(); } -void ServiceExecutorSynchronous::_schedule(Task task) { - _sharedState->schedule(std::move(task)); +Status ServiceExecutorSyncBase::shutdown(Milliseconds timeout) { + return _impl->shutdown(timeout); } -size_t ServiceExecutorSynchronous::getRunningThreads() const { - return _sharedState->lock().threads(); +size_t ServiceExecutorSyncBase::getRunningThreads() const { + return _impl->getRunningThreads(); } -void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { - // Has one client per thread and waits synchronously on that thread. - int threads = getRunningThreads(); - BSONObjBuilder{bob->subobjStart("passthrough")} - .append("threadsRunning", threads) - .append("clientsInTotal", threads) - .append("clientsRunning", threads) - .append("clientsWaitingForData", 0); +void ServiceExecutorSyncBase::appendStats(BSONObjBuilder* bob) const { + _impl->appendStats(bob); } -void ServiceExecutorSynchronous::_runOnDataAvailable(const SessionHandle& session, Task task) { - invariant(session); - yieldIfAppropriate(); - _schedule(std::move(task)); +std::unique_ptr<ServiceExecutor::Executor> ServiceExecutorSyncBase::makeTaskRunner() { + return _impl->makeTaskRunner(); } -auto ServiceExecutorSynchronous::makeTaskRunner() -> std::unique_ptr<TaskRunner> { - if (!_sharedState->isRunning()) - iassert(Status(ErrorCodes::ShutdownInProgress, "Executor is not running")); +// ========================== +// ServiceExecutorSynchronous +// ========================== - /** Schedules on this. */ - class ForwardingTaskRunner : public TaskRunner { - public: - explicit ForwardingTaskRunner(ServiceExecutorSynchronous* e) : _e{e} {} +namespace { +const auto serviceExecutorSynchronousDecoration = + ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>(); - void schedule(Task task) override { - _e->_schedule(std::move(task)); - } +const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{ + "ServiceExecutorSynchronous", [](ServiceContext* ctx) { + serviceExecutorSynchronousDecoration(ctx) = std::make_unique<ServiceExecutorSynchronous>(); + }}; +} // namespace - void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override { - _e->_runOnDataAvailable(std::move(session), std::move(task)); - } +ServiceExecutorSynchronous::ServiceExecutorSynchronous() + : ServiceExecutorSyncBase{"passthrough", defaultReserved, cores(), "passthrough"} {} - private: - ServiceExecutorSynchronous* _e; - }; - return std::make_unique<ForwardingTaskRunner>(this); +ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { + auto& ref = serviceExecutorSynchronousDecoration(ctx); + invariant(ref); + return ref.get(); +} + +// ======================= +// ServiceExecutorReserved +// ======================= + +namespace { +const auto serviceExecutorReservedDecoration = + ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>(); + +const ServiceContext::ConstructorActionRegisterer serviceExecutorReservedRegisterer{ + "ServiceExecutorReserved", [](ServiceContext* ctx) { + auto threads = serverGlobalParams.reservedAdminThreads; + if (threads) + serviceExecutorReservedDecoration(ctx) = + std::make_unique<ServiceExecutorReserved>("admin connections", threads, cores()); + }}; +} // namespace + +ServiceExecutorReserved::ServiceExecutorReserved(std::string name, + size_t reservedThreads, + size_t maxIdleThreads) + : ServiceExecutorSyncBase{std::move(name), reservedThreads, maxIdleThreads, "reserved"} {} + +ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) { + return serviceExecutorReservedDecoration(ctx).get(); } } // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 89a31492be2..7ada4a0b0a7 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -29,69 +29,109 @@ #pragma once -#include <deque> +#include <memory> +#include <string> #include "mongo/base/status.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/service_context.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/platform/mutex.h" -#include "mongo/stdx/condition_variable.h" #include "mongo/transport/service_executor.h" -#include "mongo/util/hierarchical_acquisition.h" +#include "mongo/util/duration.h" -namespace mongo { -namespace transport { +namespace mongo::transport { -/** Transitional for differential benchmarking of ServiceExecutorSynchronous refactor */ -#define TRANSITIONAL_SERVICE_EXECUTOR_SYNCHRONOUS_HAS_RESERVE 0 +/** + * Transitional for differential benchmarking of ServiceExecutorSynchronous refactor. + * Can be removed when it's no longer necessary to benchmark the refactor. + * Picked up by service_executor_bm.cpp to detect this newer API. + */ +#define TRANSITIONAL_SERVICE_EXECUTOR_SYNCHRONOUS_HAS_RESERVE 1 /** - * Creates a fresh worker thread for each top-level scheduled task. Any tasks - * scheduled during the execution of that top-level task as it runs on such a - * worker thread are pushed to the queue of that worker thread. + * ServiceExecutorSynchronous is just a special case of ServiceExecutorReserved, + * Which just happens to have `reservedTheads == 0`. Both are implemented by + * this base class. * - * Thus, the top-level task is expected to represent a chain of operations, each - * of which schedules its successor before returning. The entire chain of - * operations, and nothing else, executes on the same worker thread. + * It will start `reservedThreads` on start, and create enough new threads every + * time it gives out a lease, that there are at least `reservedThreads` + * available for work (as long as spawning is possible). This means that even + * when we hit the NPROC ulimit, and spawning is temporarily failing, there will + * still be threads ready to accept work. + * + * When a lease is released, the worker will go back to waiting for work if + * there are no more than `reservedThreads + maxIdleThreads` available. + * Otherwise it will be destroyed. This means that `reservedThreads` are for + * emergency use only, while the `maxIdleThreads` specifies the strength of an + * optimization that minimizes the churn of worker threads. With + * `maxIdleThreads==0`, every worker lease would spawn a new thread, which is + * very wasteful. */ -class ServiceExecutorSynchronous final : public ServiceExecutor { +class ServiceExecutorSyncBase : public ServiceExecutor { public: - /** Returns the ServiceExecutorSynchronous decoration on `ctx`. */ - static ServiceExecutorSynchronous* get(ServiceContext* ctx); - - explicit ServiceExecutorSynchronous(ServiceContext*); - - ~ServiceExecutorSynchronous(); + ~ServiceExecutorSyncBase() override; Status start() override; Status shutdown(Milliseconds timeout) override; - std::unique_ptr<TaskRunner> makeTaskRunner() override; + std::unique_ptr<Executor> makeTaskRunner() override; size_t getRunningThreads() const override; void appendStats(BSONObjBuilder* bob) const override; -private: - class SharedState; - +protected: /** - * The behavior of `schedule` depends on whether the calling thread is a - * worker thread spawned by a previous `schedule` call. - * - * If a nonworker thread schedules a task, a worker thread is spawned, and - * the task is transferred to the new worker thread's queue. + * `reservedThreads` - At `start` time and at `makeTaskRunner` time, workers + * will be spawned until this number of available threads is reached. This + * is intended as a minimum bulwark against periods during which workers + * cannot spawn. * - * If a worker thread schedules a task, the task is pushed to the back of its - * queue. The worker thread exits when the queue becomes empty. + * `maxIdleThreads` - When a worker lease is released, the corresponding + * worker can be destroyed, or it can be kept for use by the next lease. + * This parameter specifies the upper limit on the number of such kept + * threads, not counting reserved threads. */ - void _schedule(Task task); + ServiceExecutorSyncBase(std::string name, + size_t reservedThreads, + size_t maxIdleThreads, + std::string statLabel); + +private: + class Impl; + + std::shared_ptr<Impl> _impl; +}; - void _runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback); +/** + * Provides access to a pool of dedicated worker threads via `makeTaskRunner`, + * which retuns a worker lease. A worker lease is a handle that acts as an + * executor, accepting tasks. The leased worker thread is dedicated, meaning it + * will only work through the lease handle: all tasks scheduled on that + * lease will be run on that worker, and no other tasks. + * + * A worker thread may be reused and leased many times, but only serially so. + */ +class ServiceExecutorSynchronous : public ServiceExecutorSyncBase { +public: + static inline size_t defaultReserved = 0; /** For testing */ + /** Returns the ServiceExecutorSynchronous decoration on `ctx`. */ + static ServiceExecutorSynchronous* get(ServiceContext* ctx); + + ServiceExecutorSynchronous(); +}; + +/** + * Same as ServiceExecutorSynchronous (implemented by the same base class), but + * ServiceExecutorReserved has the additional property of having configurable + * `reservedThreads` and `maxIdleThreads` counts. See base class docs. + */ +class ServiceExecutorReserved : public ServiceExecutorSyncBase { +public: + /** Null result is possible, as ServiceExecutorReserved could be absent. */ + static ServiceExecutorReserved* get(ServiceContext* ctx); - std::shared_ptr<SharedState> _sharedState; + ServiceExecutorReserved(std::string name, size_t reservedThreads, size_t maxIdleThreads); }; -} // namespace transport -} // namespace mongo +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_synchronous_synchronization.cpp b/src/mongo/transport/service_executor_synchronous_synchronization.cpp new file mode 100644 index 00000000000..6fd0207f7ac --- /dev/null +++ b/src/mongo/transport/service_executor_synchronous_synchronization.cpp @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/transport/service_executor_synchronous_synchronization.h" + +#include "mongo/logv2/log.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor + +namespace mongo::transport { + +void SyncDomain::_findCycle(std::vector<const ThreadRecord*>& seekers) const { + auto&& rec = *seekers.back(); + if (!rec.wants) + return; // Tail is not seeking anything. + // See if any seeker in the chain, so far, holds what I want. + for (auto&& [_, hRec] : _threads) { + auto&& h = hRec.holds; + if (std::find_if(h.begin(), h.end(), [&](auto&& el) { return el.name == *rec.wants; }) == + h.end()) + continue; + // A node is holding what the tail of the chain wants. + if (std::find(seekers.begin(), seekers.end(), &hRec) != seekers.end()) { + // ... and that node is on the seeker chain. + LOGV2_FATAL(7015116, "Deadlock", "cycle"_attr = [&] { + BSONArrayBuilder cycle; + for (auto&& seeker : seekers) { + BSONObjBuilder b{cycle.subobjStart()}; + b.append("thread", seeker->threadName); + b.append("wants", *seeker->wants); + BSONArrayBuilder holdsArr{b.subarrayStart("holds")}; + for (auto&& h : seeker->holds) + BSONObjBuilder{holdsArr.subobjStart()} + .append("name", h.name) + .append("bt", h.backtrace); + } + return cycle.obj(); + }()); + } + seekers.push_back(&hRec); + _findCycle(seekers); + seekers.pop_back(); + } +} + +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_synchronous_synchronization.h b/src/mongo/transport/service_executor_synchronous_synchronization.h new file mode 100644 index 00000000000..af6885a1454 --- /dev/null +++ b/src/mongo/transport/service_executor_synchronous_synchronization.h @@ -0,0 +1,283 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <algorithm> +#include <boost/optional.hpp> +#include <fmt/format.h> +#include <map> +#include <memory> +#include <string> +#include <utility> + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/logv2/log.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/stacktrace.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor + +// Augment synchronization operations with verbose logging, deadlock detection, backtraces? +#ifndef MONGO_SERVICE_EXECUTOR_SYNCHRONOUS_LOCK_DEBUGGING_ENABLE +#define MONGO_SERVICE_EXECUTOR_SYNCHRONOUS_LOCK_DEBUGGING_ENABLE 0 +#endif + +namespace mongo::transport { + +/** + * All Synchronization objects that interact with each other will share a SyncDomain. + * This is a small book keeping structure that has an overall awareness of + * the state of all objects, to instrument their interactions. + * Its features are only used in instrumented builds. + */ +class SyncDomain { +public: + SyncDomain() = default; + + void afterLock(std::string name) { + stdx::unique_lock lk{_mutex}; + auto& rec = _threads[_tid()]; + rec.holds.push_back(ThreadRecord::Hold{*std::exchange(rec.wants, {}), _trace()}); + } + + void onUnlock(const std::string& name) { + stdx::unique_lock lk{_mutex}; + auto iter = _threads.find(_tid()); + invariant(iter != _threads.end()); + auto&& holds = iter->second.holds; + auto holdIter = + std::find_if(holds.begin(), holds.end(), [&](auto&& el) { return el.name == name; }); + invariant(holdIter != holds.end()); + holds.erase(holdIter); + } + + void beforeLock(std::string name) { + stdx::unique_lock lk{_mutex}; + auto iter = _threads.find(_tid()); + if (iter == _threads.end()) + iter = + _threads.insert({_tid(), ThreadRecord{_tid(), std::string{getThreadName()}}}).first; + auto& myRec = iter->second; + myRec.wants = std::move(name); + std::vector<const ThreadRecord*> seekerChain{&myRec}; + _findCycle(seekerChain); + } + + BSONObj alsoHolding() const { + stdx::unique_lock lk{_mutex}; + auto iter = _threads.find(_tid()); + if (iter == _threads.end()) + return {}; + return iter->second.toBSON(); + } + +private: + struct ThreadRecord { + struct Hold { + explicit Hold(std::string name, std::string backtrace) + : name{std::move(name)}, backtrace{std::move(backtrace)} {} + std::string name; + std::string backtrace; + }; + stdx::thread::id tid; + std::string threadName; + boost::optional<std::string> wants; + std::vector<Hold> holds; + + BSONObj toBSON() const { + BSONObjBuilder b; + b.append("threadName", threadName); + b.append("wants", wants.value_or("")); + { + BSONArrayBuilder holdsArr{b.subarrayStart("holds")}; + for (auto&& h : holds) + holdsArr.append(h.name); + } + return b.obj(); + } + }; + + std::string _trace() const { + std::string backtrace; + StringStackTraceSink traceSink{backtrace}; + printStackTrace(traceSink); + return backtrace; + } + + void _findCycle(std::vector<const ThreadRecord*>& seekers) const; + + stdx::thread::id _tid() const { + return stdx::this_thread::get_id(); + } + + mutable stdx::mutex _mutex; // NOLINT + std::map<stdx::thread::id, ThreadRecord> _threads; +}; + +/** + * Abstraction to instrument synchronization behavior. This is basically a + * mutex and an associated condition variable, but can be built with lots of + * debug instrumentation. + */ +class Synchronization { +private: + static constexpr bool _instrumented = MONGO_SERVICE_EXECUTOR_SYNCHRONOUS_LOCK_DEBUGGING_ENABLE; + + class InstrumentedMutex { + private: + struct LockRecord { + stdx::thread::id id; + std::string name; + }; + + class HeldBy { + public: + void set() { + stdx::lock_guard lk{_mutex}; + _lockRecord = LockRecord{stdx::this_thread::get_id(), std::string{getThreadName()}}; + } + + void reset() { + stdx::lock_guard lk{_mutex}; + _lockRecord.reset(); + } + + boost::optional<LockRecord> get() const { + stdx::lock_guard lk{_mutex}; + return _lockRecord; + } + + std::string toString() const { + stdx::lock_guard lk{_mutex}; + if (_lockRecord) + return _lockRecord->name; + return {}; + } + + private: + mutable stdx::mutex _mutex; // NOLINT + boost::optional<LockRecord> _lockRecord; + }; + + public: + explicit InstrumentedMutex(Synchronization* source) : _source{source} {} + + void lock() { + HeldBy& h = _heldBy; + _source->_domain->beforeLock(_source->_name); + _mutex.lock(); + _source->_domain->afterLock(_source->_name); + h.set(); + } + + void unlock() { + _mutex.unlock(); + _source->_domain->onUnlock(_source->_name); + _heldBy.reset(); + } + + private: + Synchronization* _source; + Mutex _mutex; + HeldBy _heldBy; + }; + + class BareMutex { + public: + explicit BareMutex(Synchronization*) {} + void lock() { + _mutex.lock(); + } + void unlock() { + _mutex.unlock(); + } + + private: + Mutex _mutex; + }; + + using MyMutex = std::conditional_t<_instrumented, InstrumentedMutex, BareMutex>; + + MyMutex _initMutex() { + return MyMutex{this}; + } + + std::shared_ptr<SyncDomain> _initRootDomain() { + if constexpr (_instrumented) + return std::make_shared<SyncDomain>(); + else + return nullptr; + } + +public: + using Lock = stdx::unique_lock<MyMutex>; + + explicit Synchronization(std::string name) + : Synchronization{std::move(name), _initRootDomain()} {} + Synchronization(std::string name, std::shared_ptr<SyncDomain> domain) + : _name{std::move(name)}, _domain{std::move(domain)} {} + + Lock acquireLock() { + return Lock(_mutex); + } + + void notifyOne() { + _cv.notify_one(); + } + + void notifyAll() { + _cv.notify_all(); + } + + template <typename P> + void wait(Lock& lk, const P& p) { + _cv.wait(lk, p); + } + + template <typename Dur, typename P> + bool waitFor(Lock& lk, const Dur& dur, const P& p) { + return _cv.wait_for(lk, dur.toSystemDuration(), p); + } + + const std::shared_ptr<SyncDomain>& domain() const { + return _domain; + } + +private: + std::string _name; + MyMutex _mutex{_initMutex()}; + stdx::condition_variable_any _cv; + std::shared_ptr<SyncDomain> _domain; +}; + +} // namespace mongo::transport + +#undef MONGO_LOGV2_DEFAULT_COMPONENT diff --git a/src/mongo/transport/service_executor_synchronous_thread.cpp b/src/mongo/transport/service_executor_synchronous_thread.cpp new file mode 100644 index 00000000000..2d095abfd22 --- /dev/null +++ b/src/mongo/transport/service_executor_synchronous_thread.cpp @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <fmt/format.h> + +#include "mongo/base/status.h" +#include "mongo/logv2/log.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/functional.h" +#include "mongo/util/thread_safety_context.h" + +#if !defined(_WIN32) +#include <sys/resource.h> +#include <unistd.h> +#endif + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor + +namespace mongo::transport { + +namespace { + +using namespace fmt::literals; + +MONGO_FAIL_POINT_DEFINE(serviceExecutorSynchronousThreadFailToSpawn); + +#if !defined(_WIN32) + +template <typename T> +auto roundToNext(T n, T mod) { + return (n + mod - 1) / mod * mod; +} + +size_t getStackSizeResourceLimit() { + struct rlimit rlim; + if (getrlimit(RLIMIT_STACK, &rlim)) { + auto ec = lastPosixError(); + iasserted(ErrorCodes::InternalError, "getrlimit:{}"_format(errorMessage(ec))); + } + return rlim.rlim_cur; +} + +size_t getSystemPageSize() { + auto r = sysconf(_SC_PAGE_SIZE); + if (r == -1) { + auto ec = lastPosixError(); + iasserted(ErrorCodes::InternalError, "sysconf:{}"_format(errorMessage(ec))); + } + return r; +} + +void configureStackSize(pthread_attr_t* attrs) { + static constexpr size_t kSuggested = 1 << 20; // 1MiB + auto rlim = getStackSizeResourceLimit(); + iassert(ErrorCodes::InternalError, + "Small stack size resource limit. rlim={}, suggested={}"_format(rlim, kSuggested), + rlim >= kSuggested); + size_t sz = kSuggested; + +#if __SANITIZE_ADDRESS__ || __has_feature(address_sanitizer) + // If we are using address sanitizer, we set the stack at + // ~75% (rounded up to a multiple of the page size) of our + // usual desired. Since ASAN is known to use stack more + // aggressively and should positively detect stack overflow, + // this gives us increased confidence during testing that we + // aren't flirting with our real 1MB limit for any tested + // workloads. Note: This calculation only works on POSIX + // platforms. If we ever decide to use the MSVC + // implementation of ASAN, we will need to revisit it. + sz = roundToNext(sz * 3 / 4, getSystemPageSize()); +#endif + + if (int failed = pthread_attr_setstacksize(attrs, sz)) { + auto ec = posixError(failed); + iasserted(ErrorCodes::InternalError, + "pthread_attr_setstacksize: {}"_format(errorMessage(ec))); + } +} + +#endif // !_WIN32 + +} // namespace + +Status launchServiceWorkerThread(unique_function<void()> task) try { + if (serviceExecutorSynchronousThreadFailToSpawn.shouldFail()) + iasserted(7015118, "Injected spawn failure"); +#if defined(_WIN32) + stdx::thread([task = std::move(task)]() mutable { task(); }).detach(); +#else + pthread_attr_t attrs; + pthread_attr_init(&attrs); + ScopeGuard attrsGuard([&attrs] { pthread_attr_destroy(&attrs); }); + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); + + try { + configureStackSize(&attrs); + } catch (const DBException& ex) { + LOGV2_WARNING(5017170, "Failed to configure stack size. Using default", "error"_attr = ex); + } + + // Wrap the user-specified `task` so it runs with an installed `sigaltstack`. + task = [sigAltStackController = std::make_shared<stdx::support::SigAltStackController>(), + f = std::move(task)]() mutable { + auto sigAltStackGuard = sigAltStackController->makeInstallGuard(); + f(); + }; + + struct ThreadData { + ThreadData(unique_function<void()> f) : f{std::move(f)} {} + unique_function<void()> f; + }; + auto td = std::make_unique<ThreadData>(std::move(task)); + auto threadBody = +[](void* arg) -> void* { + std::unique_ptr<ThreadData>(static_cast<ThreadData*>(arg))->f(); + return nullptr; + }; + + pthread_t thread; + ThreadSafetyContext::getThreadSafetyContext()->onThreadCreate(); + if (int failed = pthread_create(&thread, &attrs, threadBody, td.get()); failed > 0) { + LOGV2_ERROR_OPTIONS(4850900, + {logv2::UserAssertAfterLog()}, + "pthread_create failed", + "error"_attr = errorMessage(posixError(failed))); + } else if (failed < 0) { + auto ec = lastPosixError(); + LOGV2_ERROR_OPTIONS(4850901, + {logv2::UserAssertAfterLog()}, + "pthread_create failed with a negative return code", + "code"_attr = failed, + "errno"_attr = ec.value(), + "error"_attr = errorMessage(ec)); + } + td.release(); +#endif + + return Status::OK(); +} catch (const std::exception& e) { + LOGV2_ERROR(22948, "Thread creation failed", "error"_attr = e.what()); + return {ErrorCodes::InternalError, + "Failed to create service entry worker thread: {}"_format(e.what())}; +} + +} // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_synchronous_thread.h index 3874a6ce653..2a5666e5276 100644 --- a/src/mongo/transport/service_executor_utils.h +++ b/src/mongo/transport/service_executor_synchronous_thread.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,14 +27,16 @@ * it in the license file. */ -#pragma once - -#include "mongo/transport/service_executor.h" -#include "mongo/transport/session.h" +#include "mongo/base/status.h" #include "mongo/util/functional.h" namespace mongo::transport { +/** + * The ServiceExecutorSynchronous worker threads are specially made, and (except on Windows) + * different from the `stdx::thread` used elsewhere in the system. + * They're POSIX threads, they're detached, and they have a custom stack size. + */ Status launchServiceWorkerThread(unique_function<void()> task); } // namespace mongo::transport diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp index 7bf605cc28a..70605762783 100644 --- a/src/mongo/transport/service_executor_test.cpp +++ b/src/mongo/transport/service_executor_test.cpp @@ -50,10 +50,12 @@ #include "mongo/unittest/matcher.h" #include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/notification.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" #include "mongo/util/future.h" #include "mongo/util/scopeguard.h" +#include "mongo/util/synchronized_value.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest @@ -63,10 +65,125 @@ namespace { namespace m = unittest::match; +using unittest::stringify::stringifyForAssert; + constexpr auto kWorkerThreadRunTime = Milliseconds{1000}; // Run time + generous scheduling time slice constexpr auto kShutdownTime = Milliseconds{kWorkerThreadRunTime.count() + 50}; +template <typename M> +class AtomicWordLoadIs : public m::Matcher { +public: + explicit AtomicWordLoadIs(M m) : _m{std::move(m)} {} + + std::string describe() const { + return "AtomicWordLoadIs({})"_format(_m.describe()); + } + + template <typename T> + m::MatchResult match(const T& x) const { + auto r = x.load(); + if (auto mr = _m.match(r); !mr) + return {false, + "{} failed {}{}"_format(stringifyForAssert(r), _m.describe(), mr.message())}; + return {}; + } + +private: + M _m; +}; + +/** Matches a ServiceExecutor or the BSONObj it produces with its `appendStats`. */ +template <typename ThreadMatch, typename ClientMatch> +class ExecStatsIs : public m::Matcher { +public: + ExecStatsIs(std::string execStatsLabel, ThreadMatch tm, ClientMatch cm) + : _execStatsLabel{std::move(execStatsLabel)}, _tm{std::move(tm)}, _cm{std::move(cm)} {} + + std::string describe() const { + return "ExecStatsIs({},{})"_format(_tm.describe(), _cm.describe()); + } + + m::MatchResult match(const BSONObj& x) const { + unittest::stringify::Joiner joiner; + bool ok = true; + auto obj = x[_execStatsLabel].Obj(); + + auto tIn = obj["threadsRunning"].Int(); + if (auto tmr = _tm.match(tIn); !tmr) { + joiner("threadsRunning={} failed {}{}"_format( + stringifyForAssert(tIn), _tm.describe(), tmr.message())); + ok = false; + } + + auto cIn = obj["clientsInTotal"].Int(); + if (auto cmr = _cm.match(cIn); !cmr) { + joiner("clientsInTotal={} failed {}{}"_format( + stringifyForAssert(cIn), _cm.describe(), cmr.message())); + ok = false; + } + return {ok, std::string{joiner}}; + } + + m::MatchResult match(const ServiceExecutor& exec) const { + BSONObjBuilder bob; + exec.appendStats(&bob); + BSONObj obj = bob.done(); + if (auto mr = match(obj); !mr) + return {false, "obj={}, message={}"_format(obj.toString(), mr.message())}; + return {}; + } + +private: + std::string _execStatsLabel; + ThreadMatch _tm; + ClientMatch _cm; +}; + +/** + * Match is re-evaluated repeatedly with an exponential backoff, up to some + * limit, at which time this enclosing matcher fails. + */ +template <typename M> +class SoonMatches : public m::Matcher { +public: + explicit SoonMatches(M&& m, int retries = 16) : _m{std::forward<M>(m)}, _retries{retries} {} + + std::string describe() const { + return "SoonMatches({},{})"_format(_m.describe(), _retries); + } + + template <typename X> + m::MatchResult match(const X& x) const { + // Fibonacci generator for slow integral exponential backoff. + auto fib = [seq = std::array<int64_t, 2>{0, 1}]() mutable { + auto r = seq[0]; + seq[0] = seq[1]; + seq[1] = r + seq[0]; + return r; + }; + m::MatchResult mr; + for (int retries = _retries; retries--;) { + if (mr = _m.match(x); mr) + return mr; + Milliseconds backoff{fib()}; + LOGV2_DEBUG(1715120, + 1, + "Retry", + "matcher"_attr = describe(), + "retries"_attr = retries, + "backoff"_attr = backoff, + "message"_attr = mr.message()); + sleepFor(backoff); + } + return {false, "No result matched after {} tries: {}"_format(_retries, mr.message())}; + } + +private: + M _m; + int _retries; +}; + class JoinThread : public stdx::thread { public: using stdx::thread::thread; @@ -142,30 +259,252 @@ private: asio::io_context _ioContext; }; -class ServiceExecutorSynchronousTest : public unittest::Test { +/** + * ServiceExecutorSynchronous and ServiceExecutorReserved are closely related. + * This is a common basis for the fixtures that test them. + */ +template <typename Derived> +class ServiceExecutorSynchronousTestBase : public unittest::Test { public: - void setUp() override { - setGlobalServiceContext(ServiceContext::make()); + auto execStatsElementMatcher(int threads, int clients) { + return ExecStatsIs(getStatsLabel(), m::Eq(threads), m::Eq(clients)); + } + + void testCreateDestroy() { + makeExecutor(); + } + + void testStartStop() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + ASSERT_OK(executor.shutdown(kShutdownTime)); + } + + void testMakeTaskRunnerFailsBeforeStartup() { + auto executor = makeExecutor(); + ASSERT_THROWS(executor.makeTaskRunner(), DBException); } - void tearDown() override { - setGlobalServiceContext(nullptr); + void testMakeTaskRunner() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + executor.makeTaskRunner(); + ASSERT_OK(executor.shutdown(kShutdownTime)); + } + + void testMakeTaskRunnerMultiple() { + auto reserved = getReserved(); + auto executor = makeExecutor(); +#define LOCAL_CHECK_STATS(threads, clients) \ + ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(threads, clients))) + ASSERT_OK(executor.start()); + LOCAL_CHECK_STATS(reserved, 0); + std::vector<std::unique_ptr<ServiceExecutor::Executor>> runners; + // Add a few more beyond the reserve. + for (size_t i = 0; i < reserved + 3; ++i) { + runners.push_back(executor.makeTaskRunner()); + LOCAL_CHECK_STATS(runners.size() + reserved, runners.size()) << ", i:" << i; + } + ASSERT_OK(executor.shutdown(kShutdownTime)); + LOCAL_CHECK_STATS(0, 0); +#undef LOCAL_CHECK_STATS + } + + void testBasicTaskRuns() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + PromiseAndFuture<void> pf; + auto runner = executor.makeTaskRunner(); + runner->schedule([&](Status st) { pf.promise.setFrom(st); }); + ASSERT_DOES_NOT_THROW(pf.future.get()); + ASSERT_OK(executor.shutdown(kShutdownTime)); + } + + void testShutdownTimeout() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + auto runner = executor.makeTaskRunner(); + PromiseAndFuture<void> taskStarted; + runner->schedule([&](Status st) { + taskStarted.promise.setFrom(st); + sleepFor(Milliseconds{2000}); + }); + taskStarted.future.get(); + ASSERT_THAT(executor.shutdown(Milliseconds{1000}), + m::StatusIs(m::Eq(ErrorCodes::ExceededTimeLimit), m::Any())); + } + + // Should tolerate the failure to spawn all these reserved threads. + void testManyLeases() { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + for (size_t i = 0; i < 10; ++i) { + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + for (size_t j = 0; j < 20; ++j) + leases.push_back(executor.makeTaskRunner()); + } + } + + decltype(auto) makeExecutor() { + return _d().makeExecutor(); + } + + virtual std::string getStatsLabel() const = 0; + + virtual size_t getReserved() const = 0; + + std::unique_ptr<FailPointEnableBlock> makeFailSpawnBlock() { + return std::make_unique<FailPointEnableBlock>( + "serviceExecutorSynchronousThreadFailToSpawn"); + } + +private: + decltype(auto) _d() const { + return static_cast<const Derived&>(*this); + } + decltype(auto) _d() { + return static_cast<Derived&>(*this); + } +}; + +class ServiceExecutorSynchronousTest + : public ServiceExecutorSynchronousTestBase<ServiceExecutorSynchronousTest> { +public: + ServiceExecutorSynchronous makeExecutor() const { + return {}; + } + std::string getStatsLabel() const override { + return "passthrough"; + } + size_t getReserved() const override { + return 0; } }; -TEST_F(ServiceExecutorSynchronousTest, BasicTaskRuns) { - ServiceExecutorSynchronous executor(getGlobalServiceContext()); +class ServiceExecutorReservedTest + : public ServiceExecutorSynchronousTestBase<ServiceExecutorReservedTest> { +public: + ServiceExecutorReserved makeExecutor() const { + return {"testReserved", reserved, maxIdleThreads}; + } + std::string getStatsLabel() const override { + return "reserved"; + } + size_t getReserved() const override { + return reserved; + } + +protected: + size_t reserved = 5; + size_t maxIdleThreads = 0; +}; + +#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, case) \ + TEST_F(fixture, case) { \ + test##case (); \ + } + +/** + * Expand this macro to instantiate the test cases for each of the corresponding + * member functions of the fixture base class. These are tests that + * ServiceExecutorSynchronous and ServiceExecutorReserved should pass. + */ +#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(fixture) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, CreateDestroy) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, StartStop) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, BasicTaskRuns) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerFailsBeforeStartup) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunner) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerMultiple) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ShutdownTimeout) \ + SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ManyLeases) \ + /**/ + +SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorSynchronousTest) +SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorReservedTest) + +#define SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(exec, threads, clients) \ + ASSERT_THAT(exec, SoonMatches(execStatsElementMatcher(threads, clients))) + +// Create leases until leases exceed the reserve count of threads, and check +// that the executor keeps its thread count above the number of leases by a +// margin of `reserved` as it goes. +TEST_F(ServiceExecutorReservedTest, CreateLeaseBeyondReserve) { +#define LOCAL_CHECK_STATS(threads, clients) \ + SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(executor, threads, clients) + reserved = 5; + auto executor = makeExecutor(); ASSERT_OK(executor.start()); - PromiseAndFuture<void> pf; - auto runner = executor.makeTaskRunner(); - runner->schedule([&](Status st) { pf.promise.setFrom(st); }); - ASSERT_DOES_NOT_THROW(pf.future.get()); + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + while (leases.size() < reserved + 1) { + leases.push_back(executor.makeTaskRunner()); + LOCAL_CHECK_STATS(leases.size() + reserved, leases.size()); + } + while (!leases.empty()) { + leases.pop_back(); + LOCAL_CHECK_STATS(leases.size() + reserved, leases.size()); + } ASSERT_OK(executor.shutdown(kShutdownTime)); + LOCAL_CHECK_STATS(0, 0); +#undef LOCAL_CHECK_STATS +} + +TEST_F(ServiceExecutorReservedTest, ImmediateThrowFromNoReserveSpawnFailure) { + reserved = 0; + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + auto failSpawns = makeFailSpawnBlock(); + ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, 0))); + ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>); + failSpawns = {}; + ASSERT_DOES_NOT_THROW(executor.makeTaskRunner()); } -TEST_F(ServiceExecutorSynchronousTest, MakeTaskRunnerFailsBeforeStartup) { - ServiceExecutorSynchronous executor{getGlobalServiceContext()}; - ASSERT_THROWS(executor.makeTaskRunner(), DBException); +// The basic point of the "reserved" ServiceExecutor is to allow new connections +// during time periods in which spawns are failing. Verify this fundamental +// requirement of the reserved ServiceExecutor. +TEST_F(ServiceExecutorReservedTest, ReserveMitigatesSpawnFailures) { + reserved = 5; + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0)); + auto failSpawns = makeFailSpawnBlock(); + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + while (leases.size() < reserved) + leases.push_back(executor.makeTaskRunner()); + // One worker is in the starting state while it unsuccesfully attempts to spawn. + // After the failure, we expect it to be removed from the starting bucket. + ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, leases.size()))) + << "Should be sufficient reserve threads for demand during setup"; + ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>) + << "Should throw when out of reserve threads"; + failSpawns = {}; + ASSERT_DOES_NOT_THROW(executor.makeTaskRunner()); +} + +// Check that workers are kept alive after their leases expire according to maxIdleThreads. +TEST_F(ServiceExecutorReservedTest, MaxIdleThreads) { + for (reserved = 0; reserved != 5; ++reserved) { + for (maxIdleThreads = 0; maxIdleThreads != 5; ++maxIdleThreads) { + for (size_t leaseCount = 0; leaseCount != reserved + maxIdleThreads; ++leaseCount) { + auto executor = makeExecutor(); + ASSERT_OK(executor.start()); + ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0)); + + std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases; + while (leases.size() < leaseCount) + leases.push_back(executor.makeTaskRunner()); + leases.clear(); + + ASSERT_THAT(executor, + SoonMatches(execStatsElementMatcher( + reserved + std::min(maxIdleThreads, leaseCount), 0))) + << ", reserved=" << reserved // + << ", maxIdleThreads=" << maxIdleThreads // + << ", leaseCount=" << leaseCount; + } + } + } } class ServiceExecutorFixedTest : public unittest::Test { diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp deleted file mode 100644 index 317e4261598..00000000000 --- a/src/mongo/transport/service_executor_utils.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/transport/service_executor_utils.h" - -#include <fmt/format.h> -#include <functional> -#include <memory> - -#include "mongo/logv2/log.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_executor.h" -#include "mongo/util/assert_util.h" -#include "mongo/util/debug_util.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/thread_safety_context.h" - -#if !defined(_WIN32) -#include <sys/resource.h> -#include <unistd.h> -#endif - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault - - -#if !defined(__has_feature) -#define __has_feature(x) 0 -#endif - -namespace mongo::transport { - -namespace { -using namespace fmt::literals; - -void* runFunc(void* ctx) { - auto taskPtr = - std::unique_ptr<unique_function<void()>>(static_cast<unique_function<void()>*>(ctx)); - (*taskPtr)(); - - return nullptr; -} -} // namespace - -Status launchServiceWorkerThread(unique_function<void()> task) { - - try { -#if defined(_WIN32) - stdx::thread([task = std::move(task)]() mutable { task(); }).detach(); -#else - pthread_attr_t attrs; - pthread_attr_init(&attrs); - ScopeGuard attrsGuard([&attrs] { pthread_attr_destroy(&attrs); }); - pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); - - static const rlim_t kStackSize = - 1024 * 1024; // if we change this we need to update the warning - - struct rlimit limits; - invariant(getrlimit(RLIMIT_STACK, &limits) == 0); - if (limits.rlim_cur >= kStackSize) { - - size_t stackSizeToSet = kStackSize; - -#if !defined(_WIN32) && (__SANITIZE_ADDRESS__ || __has_feature(address_sanitizer)) - // If we are using address sanitizer, we set the stack at - // ~75% (rounded up to a multiple of the page size) of our - // usual desired. Since ASAN is known to use stack more - // aggressively and should positively detect stack overflow, - // this gives us increased confidence during testing that we - // aren't flirting with our real 1MB limit for any tested - // workloads. Note: This calculation only works on POSIX - // platforms. If we ever decide to use the MSVC - // implementation of ASAN, we will need to revisit it. - long page_size = sysconf(_SC_PAGE_SIZE); - stackSizeToSet = - ((((stackSizeToSet * 3) >> 2) + page_size - 1) / page_size) * page_size; -#endif - int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet); - if (failed) { - LOGV2_WARNING(22949, - "pthread_attr_setstacksize failed: {error}", - "pthread_attr_setstacksize failed", - "error"_attr = errorMessage(posixError(failed))); - } - } else { - LOGV2_WARNING(22950, - "Stack size set to {stackSizeKiB}KiB. We suggest 1024KiB", - "Stack size not set to suggested 1024KiB", - "stackSizeKiB"_attr = (limits.rlim_cur / 1024)); - } - - // Wrap the user-specified `task` so it runs with an installed `sigaltstack`. - task = [sigAltStackController = std::make_shared<stdx::support::SigAltStackController>(), - f = std::move(task)]() mutable { - auto sigAltStackGuard = sigAltStackController->makeInstallGuard(); - f(); - }; - - pthread_t thread; - auto ctx = std::make_unique<unique_function<void()>>(std::move(task)); - ThreadSafetyContext::getThreadSafetyContext()->onThreadCreate(); - - int failed = pthread_create(&thread, &attrs, runFunc, ctx.get()); - if (failed > 0) { - LOGV2_ERROR_OPTIONS(4850900, - {logv2::UserAssertAfterLog()}, - "pthread_create failed: error: {error}", - "pthread_create failed", - "error"_attr = errorMessage(posixError(failed))); - } else if (failed < 0) { - auto ec = lastPosixError(); - LOGV2_ERROR_OPTIONS(4850901, - {logv2::UserAssertAfterLog()}, - "pthread_create failed with a negative return code: {code}, errno: " - "{errno}, error: {error}", - "pthread_create failed with a negative return code", - "code"_attr = failed, - "errno"_attr = ec.value(), - "error"_attr = errorMessage(ec)); - } - - ctx.release(); -#endif - - } catch (const std::exception& e) { - LOGV2_ERROR(22948, "Thread creation failed", "error"_attr = e.what()); - return {ErrorCodes::InternalError, - format(FMT_STRING("Failed to create service entry worker thread: {}"), e.what())}; - } - - return Status::OK(); -} - -} // namespace mongo::transport diff --git a/src/mongo/transport/session_workflow.cpp b/src/mongo/transport/session_workflow.cpp index 080d1565912..32b49603221 100644 --- a/src/mongo/transport/session_workflow.cpp +++ b/src/mongo/transport/session_workflow.cpp @@ -416,7 +416,7 @@ public: return ServiceExecutorContext::get(client())->getServiceExecutor(); } - std::shared_ptr<ServiceExecutor::TaskRunner> taskRunner() { + std::shared_ptr<ServiceExecutor::Executor> taskRunner() { auto exec = executor(); // Allows switching the executor between iterations of the workflow. if (MONGO_unlikely(!_taskRunner.source || _taskRunner.source != exec)) @@ -436,10 +436,22 @@ private: class WorkItem; struct RunnerAndSource { - std::shared_ptr<ServiceExecutor::TaskRunner> runner; + std::shared_ptr<ServiceExecutor::Executor> runner; ServiceExecutor* source = nullptr; }; + /** + * Notify the task runner that this would be a good time to yield. It might + * not actually yield, depending on implementation and on overall system + * state. + * + * Yielding at certain points in a command's processing pipeline has been + * considered to be beneficial to performance. + */ + void _yieldPointReached() { + taskRunner()->yieldPointReached(); + } + /** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */ std::shared_ptr<Impl> shared_from_this() { return {_workflow->shared_from_this(), this}; @@ -686,6 +698,7 @@ void SessionWorkflow::Impl::scheduleNewLoop(Status status) try { // If we're in exhaust, we're not expecting more data. taskRunner()->schedule(std::move(cb)); } else { + _yieldPointReached(); taskRunner()->runOnDataAvailable(session(), std::move(cb)); } } catch (const DBException& ex) { @@ -723,13 +736,7 @@ void SessionWorkflow::Impl::startNewLoop(const Status& executorStatus) { if (_work->hasOut()) { sendMessage(); _metrics.sent(*session()); - - // Performance testing showed a significant benefit from yielding here. - // TODO SERVER-57531: Once we enable the use of a fixed-size thread pool - // for handling client connection handshaking, we should only yield here if - // we're on a dedicated thread. - executor()->yieldIfAppropriate(); - + _yieldPointReached(); _metrics.yielded(); } }) diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp index df264e372a2..b98fa310b04 100644 --- a/src/mongo/transport/session_workflow_test.cpp +++ b/src/mongo/transport/session_workflow_test.cpp @@ -52,7 +52,6 @@ #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_entry_point_impl.h" #include "mongo/transport/service_executor.h" -#include "mongo/transport/service_executor_utils.h" #include "mongo/transport/session_workflow.h" #include "mongo/transport/session_workflow_test_util.h" #include "mongo/transport/transport_layer_mock.h" |