summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilly Donahue <billy.donahue@mongodb.com>2022-11-29 22:54:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-29 23:51:17 +0000
commitbdd6f97927ef265a26763f5986d185c75862650c (patch)
tree7d3adabe8d4bd5d853e40a02721f350212e0c990
parent445e80440cff2475c51cde86efacc4d0dcd99676 (diff)
downloadmongo-bdd6f97927ef265a26763f5986d185c75862650c.tar.gz
SERVER-70151 no thread_locals in ServiceExecutorSynchronous
-rw-r--r--src/mongo/db/commands/server_status_servers.cpp1
-rw-r--r--src/mongo/transport/SConscript4
-rw-r--r--src/mongo/transport/mock_service_executor.h9
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp3
-rw-r--r--src/mongo/transport/service_executor.cpp36
-rw-r--r--src/mongo/transport/service_executor.h55
-rw-r--r--src/mongo/transport/service_executor_bm.cpp19
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp23
-rw-r--r--src/mongo/transport/service_executor_fixed.h4
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp271
-rw-r--r--src/mongo/transport/service_executor_reserved.h98
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp793
-rw-r--r--src/mongo/transport/service_executor_synchronous.h116
-rw-r--r--src/mongo/transport/service_executor_synchronous_synchronization.cpp72
-rw-r--r--src/mongo/transport/service_executor_synchronous_synchronization.h283
-rw-r--r--src/mongo/transport/service_executor_synchronous_thread.cpp172
-rw-r--r--src/mongo/transport/service_executor_synchronous_thread.h (renamed from src/mongo/transport/service_executor_utils.h)12
-rw-r--r--src/mongo/transport/service_executor_test.cpp367
-rw-r--r--src/mongo/transport/service_executor_utils.cpp160
-rw-r--r--src/mongo/transport/session_workflow.cpp25
-rw-r--r--src/mongo/transport/session_workflow_test.cpp1
21 files changed, 1716 insertions, 808 deletions
diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp
index d5dd31ca347..f5acac3951b 100644
--- a/src/mongo/db/commands/server_status_servers.cpp
+++ b/src/mongo/db/commands/server_status_servers.cpp
@@ -32,7 +32,6 @@
#include "mongo/transport/message_compressor_registry.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_fixed.h"
-#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/util/net/hostname_canonicalization.h"
#include "mongo/util/net/socket_utils.h"
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 2cbff7f7147..6cb51f3162e 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -112,9 +112,9 @@ tlEnv.Library(
source=[
'service_executor.cpp',
'service_executor_fixed.cpp',
- 'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
- 'service_executor_utils.cpp',
+ 'service_executor_synchronous_synchronization.cpp',
+ 'service_executor_synchronous_thread.cpp',
'service_executor.idl',
],
LIBDEPS=[
diff --git a/src/mongo/transport/mock_service_executor.h b/src/mongo/transport/mock_service_executor.h
index 4b4eaf752a1..0cf0affe84b 100644
--- a/src/mongo/transport/mock_service_executor.h
+++ b/src/mongo/transport/mock_service_executor.h
@@ -42,8 +42,8 @@ public:
return startCb();
}
- std::unique_ptr<TaskRunner> makeTaskRunner() override {
- class Runner : public TaskRunner {
+ std::unique_ptr<Executor> makeTaskRunner() override {
+ class Runner : public Executor {
public:
explicit Runner(MockServiceExecutor* p) : _p{p} {}
@@ -51,10 +51,13 @@ public:
return _p->scheduleTaskCb(std::move(task));
}
- void runOnDataAvailable(SessionHandle session, Task onCompletionCallback) override {
+ void runOnDataAvailable(const SessionHandle& session,
+ Task onCompletionCallback) override {
_p->runOnDataAvailableCb(session, std::move(onCompletionCallback));
}
+ void yieldPointReached() const override {}
+
private:
MockServiceExecutor* _p;
};
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 0d98e57d7c9..aa04ce209b5 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -56,7 +56,6 @@
#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_gen.h"
-#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/session.h"
#include "mongo/transport/session_workflow.h"
@@ -447,7 +446,7 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
invariant(_svcCtx);
appendInt("active", _svcCtx->getActiveClientOperations());
- const auto seStats = transport::ServiceExecutorStats::get(_svcCtx);
+ const auto seStats = transport::getServiceExecutorStats(_svcCtx);
appendInt("threaded", seStats.usesDedicated);
if (!serverGlobalParams.maxConnsOverride.empty())
appendInt("limitExempt", seStats.limitExempt);
diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp
index 49a6ef4f67c..b1a93ea27fc 100644
--- a/src/mongo/transport/service_executor.cpp
+++ b/src/mongo/transport/service_executor.cpp
@@ -40,9 +40,7 @@
#include "mongo/logv2/log.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_executor_fixed.h"
-#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_synchronous.h"
-#include "mongo/util/processinfo.h"
#include "mongo/util/synchronized_value.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
@@ -55,9 +53,9 @@ bool gInitialUseDedicatedThread = true;
namespace {
static constexpr auto kDiagnosticLogLevel = 4;
-auto getServiceExecutorStats =
+auto serviceExecutorStatsDecoration =
ServiceContext::declareDecoration<synchronized_value<ServiceExecutorStats>>();
-auto getServiceExecutorContext =
+auto serviceExecutorContextDecoration =
Client::declareDecoration<std::unique_ptr<ServiceExecutorContext>>();
void incrThreadingModelStats(ServiceExecutorStats& stats, bool useDedicatedThread, int step) {
@@ -65,26 +63,26 @@ void incrThreadingModelStats(ServiceExecutorStats& stats, bool useDedicatedThrea
}
} // namespace
-ServiceExecutorStats ServiceExecutorStats::get(ServiceContext* ctx) noexcept {
- return getServiceExecutorStats(ctx).get();
+ServiceExecutorStats getServiceExecutorStats(ServiceContext* ctx) {
+ return **serviceExecutorStatsDecoration(ctx);
}
ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept {
// Service worker Clients will never have a ServiceExecutorContext.
- return getServiceExecutorContext(client).get();
+ return serviceExecutorContextDecoration(client).get();
}
void ServiceExecutorContext::set(Client* client,
std::unique_ptr<ServiceExecutorContext> seCtxPtr) noexcept {
auto& seCtx = *seCtxPtr;
- auto& serviceExecutorContext = getServiceExecutorContext(client);
+ auto& serviceExecutorContext = serviceExecutorContextDecoration(client);
invariant(!serviceExecutorContext);
seCtx._client = client;
seCtx._sep = client->getServiceContext()->getServiceEntryPoint();
{
- auto&& syncStats = *getServiceExecutorStats(client->getServiceContext());
+ auto syncStats = *serviceExecutorStatsDecoration(client->getServiceContext());
if (seCtx._canUseReserved)
++syncStats->limitExempt;
incrThreadingModelStats(*syncStats, seCtx._useDedicatedThread, 1);
@@ -102,14 +100,14 @@ void ServiceExecutorContext::set(Client* client,
void ServiceExecutorContext::reset(Client* client) noexcept {
if (!client)
return;
- auto& seCtx = getServiceExecutorContext(client);
+ auto& seCtx = serviceExecutorContextDecoration(client);
LOGV2_DEBUG(4898001,
kDiagnosticLogLevel,
"Resetting ServiceExecutor context for client",
"client"_attr = client->desc(),
"threadingModel"_attr = seCtx->_useDedicatedThread,
"canUseReserved"_attr = seCtx->_canUseReserved);
- auto stats = *getServiceExecutorStats(client->getServiceContext());
+ auto stats = *serviceExecutorStatsDecoration(client->getServiceContext());
if (seCtx->_canUseReserved)
--stats->limitExempt;
incrThreadingModelStats(*stats, seCtx->_useDedicatedThread, -1);
@@ -121,7 +119,7 @@ void ServiceExecutorContext::setUseDedicatedThread(bool b) noexcept {
auto prev = std::exchange(_useDedicatedThread, b);
if (!_client)
return;
- auto stats = *getServiceExecutorStats(_client->getServiceContext());
+ auto stats = *serviceExecutorStatsDecoration(_client->getServiceContext());
incrThreadingModelStats(*stats, prev, -1);
incrThreadingModelStats(*stats, _useDedicatedThread, +1);
}
@@ -134,7 +132,7 @@ void ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept {
_canUseReserved = canUseReserved;
if (_client) {
- auto stats = getServiceExecutorStats(_client->getServiceContext()).synchronize();
+ auto stats = *serviceExecutorStatsDecoration(_client->getServiceContext());
if (canUseReserved) {
++stats->limitExempt;
} else {
@@ -174,18 +172,6 @@ ServiceExecutor* ServiceExecutorContext::getServiceExecutor() noexcept {
return transport::ServiceExecutorSynchronous::get(_client->getServiceContext());
}
-void ServiceExecutor::yieldIfAppropriate() const {
- /*
- * In perf testing we found that yielding after running a each request produced
- * at 5% performance boost in microbenchmarks if the number of worker threads
- * was greater than the number of available cores.
- */
- static const auto cores = ProcessInfo::getNumAvailableCores();
- if (getRunningThreads() > cores) {
- stdx::this_thread::yield();
- }
-}
-
void ServiceExecutor::shutdownAll(ServiceContext* serviceContext, Date_t deadline) {
auto getTimeout = [&] {
auto now = serviceContext->getPreciseClockSource()->now();
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index d6b53453ba6..25884acca68 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -46,32 +46,32 @@ namespace mongo::transport {
extern bool gInitialUseDedicatedThread;
-/*
- * This is the interface for all ServiceExecutors.
- */
class ServiceExecutor {
public:
using Task = OutOfLineExecutor::Task;
- class TaskRunner : public OutOfLineExecutor {
+ class Executor : public OutOfLineExecutor {
public:
/**
* Awaits the availability of incoming data for the specified session. On success, it will
* schedule the callback on current executor. Otherwise, it will invoke the callback with a
* non-okay status on the caller thread.
*/
- virtual void runOnDataAvailable(std::shared_ptr<Session> session, Task task) = 0;
- };
+ virtual void runOnDataAvailable(const std::shared_ptr<Session>& session, Task task) = 0;
+ /**
+ * Yields if the associated ServiceExecutor's threads > number of cores.
+ * Yielding after each request gives +5% perf when threads outnumber
+ * cores.
+ */
+ virtual void yieldPointReached() const = 0;
+ };
static void shutdownAll(ServiceContext* serviceContext, Date_t deadline);
virtual ~ServiceExecutor() = default;
- virtual std::unique_ptr<TaskRunner> makeTaskRunner() = 0;
+ virtual std::unique_ptr<Executor> makeTaskRunner() = 0;
- /*
- * Starts the ServiceExecutor. This may create threads even if no tasks are scheduled.
- */
virtual Status start() = 0;
/*
@@ -86,14 +86,9 @@ public:
/** Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. */
virtual void appendStats(BSONObjBuilder* bob) const = 0;
-
- /** Yield if this executor controls more threads than we have cores. */
- void yieldIfAppropriate() const;
};
-/**
- * ServiceExecutorContext determines which ServiceExecutor is used for each Client.
- */
+/** Determines which ServiceExecutor is used for each Client. */
class ServiceExecutorContext {
public:
/**
@@ -170,27 +165,13 @@ private:
std::function<ServiceExecutor*()> _getServiceExecutorForTest;
};
-/**
- * A small statlet for tracking which executors may be in use.
- */
-class ServiceExecutorStats {
-public:
- /**
- * Get the current value of ServiceExecutorStats for the given ServiceContext.
- *
- * Note that this value is intended for statistics and logging. It is unsynchronized and
- * unsuitable for informing decisions in runtime.
- */
- static ServiceExecutorStats get(ServiceContext* ctx) noexcept;
-
- // The number of Clients who use the dedicated executors.
- size_t usesDedicated = 0;
-
- // The number of Clients who use the borrowed executors.
- size_t usesBorrowed = 0;
-
- // The number of Clients that are allowed to ignore maxConns and use reserved resources.
- size_t limitExempt = 0;
+struct ServiceExecutorStats {
+ size_t usesDedicated = 0; /**< Clients using the dedicated executors. */
+ size_t usesBorrowed = 0; /**< Clients using the borrowed executors. */
+ size_t limitExempt = 0; /**< Privileged Clients. */
};
+/** Snapshot of the stats attached to `ctx`. */
+ServiceExecutorStats getServiceExecutorStats(ServiceContext* ctx);
+
} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_bm.cpp b/src/mongo/transport/service_executor_bm.cpp
index b9e5b7e30dd..c0321f9e860 100644
--- a/src/mongo/transport/service_executor_bm.cpp
+++ b/src/mongo/transport/service_executor_bm.cpp
@@ -91,10 +91,6 @@ public:
lastTearDown();
}
- void runOnExec(ServiceExecutor::TaskRunner* taskRunner, ServiceExecutor::Task task) {
- taskRunner->schedule(std::move(task));
- }
-
stdx::mutex mu; // NOLINT
int nThreads = 0;
ServiceContext* sc;
@@ -105,8 +101,7 @@ auto maxThreads = 2 * ProcessInfo::getNumCores();
BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleTask)(benchmark::State& state) {
for (auto _ : state) {
- auto runner = executor()->makeTaskRunner();
- runOnExec(&*runner, [](Status) {});
+ executor()->makeTaskRunner()->schedule([](Status) {});
}
}
BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleTask)->ThreadRange(1, maxThreads);
@@ -116,7 +111,7 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ScheduleAndWait)(benchmark::Sta
for (auto _ : state) {
auto runner = executor()->makeTaskRunner();
Notification done;
- runOnExec(&*runner, [&](Status) { done.set(); });
+ runner->schedule([&](Status) { done.set(); });
done.get();
}
}
@@ -125,7 +120,7 @@ BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ScheduleAndWait)->ThreadRange
BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::State& state) {
int chainDepth = state.range(0);
struct LoopState {
- std::shared_ptr<ServiceExecutor::TaskRunner> runner;
+ std::shared_ptr<ServiceExecutor::Executor> runner;
Notification done;
unittest::Barrier startingLine{2};
};
@@ -133,7 +128,7 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta
std::function<void(Status)> chainedTask = [&](Status) { loopStatePtr->done.set(); };
for (int step = 0; step != chainDepth; ++step)
chainedTask = [this, chainedTask, &loopStatePtr](Status) {
- runOnExec(&*loopStatePtr->runner, chainedTask);
+ loopStatePtr->runner->schedule(chainedTask);
};
// The first scheduled task starts the worker thread. This test is
@@ -149,9 +144,9 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta
{},
};
loopStatePtr = &loopState;
- runOnExec(&*loopStatePtr->runner, [&](Status s) {
+ loopStatePtr->runner->schedule([&](Status s) {
loopState.startingLine.countDownAndWait();
- runOnExec(&*loopStatePtr->runner, chainedTask);
+ loopStatePtr->runner->schedule(chainedTask);
});
state.ResumeTiming();
loopState.startingLine.countDownAndWait();
@@ -159,7 +154,7 @@ BENCHMARK_DEFINE_F(ServiceExecutorSynchronousBm, ChainedSchedule)(benchmark::Sta
}
}
BENCHMARK_REGISTER_F(ServiceExecutorSynchronousBm, ChainedSchedule)
- ->Range(1, 2 << 10)
+ ->Range(1, 1 << 8)
->ThreadRange(1, maxThreads);
} // namespace
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index c0b3a5cad18..86e1456832e 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -37,6 +37,7 @@
#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/processinfo.h"
#include "mongo/util/testing_proctor.h"
#include "mongo/util/thread_safety_context.h"
@@ -332,7 +333,7 @@ void ServiceExecutorFixed::_beginShutdown() {
_checkForShutdown();
break;
case State::kStopping:
- break; // Just nead to wait it out.
+ break; // Just need to wait it out.
case State::kStopped:
break;
}
@@ -406,8 +407,6 @@ size_t ServiceExecutorFixed::getRunningThreads() const {
void ServiceExecutorFixed::_runOnDataAvailable(const SessionHandle& session,
Task onCompletionCallback) {
invariant(session);
- yieldIfAppropriate();
-
// Make sure we're still allowed to schedule and track the session
auto lk = stdx::unique_lock(_mutex);
if (_state != State::kRunning) {
@@ -452,11 +451,17 @@ int ServiceExecutorFixed::getRecursionDepthForExecutorThread() const {
return _executorContext->getRecursionDepth();
}
-auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
+void ServiceExecutorFixed::_yield() const {
+ static const auto cores = ProcessInfo::getNumAvailableCores();
+ if (getRunningThreads() > cores)
+ stdx::this_thread::yield();
+}
+
+auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<Executor> {
iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _state == State::kRunning);
/** Schedules on this. */
- class ForwardingTaskRunner : public TaskRunner {
+ class ForwardingTaskRunner : public Executor {
public:
explicit ForwardingTaskRunner(ServiceExecutorFixed* e) : _e{e} {}
@@ -464,8 +469,12 @@ auto ServiceExecutorFixed::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
_e->_schedule(std::move(task));
}
- void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override {
- _e->_runOnDataAvailable(std::move(session), std::move(task));
+ void runOnDataAvailable(const std::shared_ptr<Session>& session, Task task) override {
+ _e->_runOnDataAvailable(session, std::move(task));
+ }
+
+ void yieldPointReached() const override {
+ _e->_yield();
}
private:
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index a74adc6f3b7..0eb670fc503 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -78,7 +78,7 @@ public:
*/
int getRecursionDepthForExecutorThread() const;
- std::unique_ptr<TaskRunner> makeTaskRunner() override;
+ std::unique_ptr<Executor> makeTaskRunner() override;
private:
enum class State { kNotStarted, kRunning, kStopping, kStopped };
@@ -110,6 +110,8 @@ private:
/** Requires `_mutex` locked by `lk`. */
bool _waitForStop(stdx::unique_lock<Mutex>& lk, boost::optional<Milliseconds> timeout);
+ void _yield() const;
+
/** `_state` transitions: kNotStarted -> kRunning -> kStopping -> kStopped */
State _state = State::kNotStarted;
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
deleted file mode 100644
index 1ec4e7af75e..00000000000
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_executor_reserved.h"
-
-#include "mongo/db/server_options.h"
-#include "mongo/logv2/log.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_executor_utils.h"
-#include "mongo/util/processinfo.h"
-#include "mongo/util/thread_safety_context.h"
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
-
-
-namespace mongo {
-namespace transport {
-namespace {
-
-constexpr auto kExecutorName = "reserved"_sd;
-
-constexpr auto kThreadsRunning = "threadsRunning"_sd;
-constexpr auto kClientsInTotal = "clientsInTotal"_sd;
-constexpr auto kClientsRunning = "clientsRunning"_sd;
-constexpr auto kClientsWaiting = "clientsWaitingForData"_sd;
-
-const auto getServiceExecutorReserved =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>();
-
-const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{
- "ServiceExecutorReserved", [](ServiceContext* ctx) {
- if (!serverGlobalParams.reservedAdminThreads) {
- return;
- }
-
- getServiceExecutorReserved(ctx) = std::make_unique<transport::ServiceExecutorReserved>(
- ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
- }};
-} // namespace
-
-thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {};
-thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0;
-
-ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx,
- std::string name,
- size_t reservedThreads)
- : _name(std::move(name)), _reservedThreads(reservedThreads) {}
-
-Status ServiceExecutorReserved::start() {
- {
- stdx::unique_lock<Latch> lk(_mutex);
- _stillRunning.store(true);
- _numStartingThreads = _reservedThreads;
- }
-
- for (size_t i = 0; i < _reservedThreads; i++) {
- auto status = _startWorker();
- if (!status.isOK()) {
- return status;
- }
- }
-
- return Status::OK();
-}
-
-Status ServiceExecutorReserved::_startWorker() {
- LOGV2(22978,
- "Starting new worker thread for {name} service executor",
- "Starting new worker thread for service executor",
- "name"_attr = _name);
- return launchServiceWorkerThread([this] {
- stdx::unique_lock<Latch> lk(_mutex);
- _numRunningWorkerThreads.addAndFetch(1);
- ScopeGuard numRunningGuard([&] {
- _numRunningWorkerThreads.subtractAndFetch(1);
- _shutdownCondition.notify_one();
- });
-
- _numStartingThreads--;
- _numReadyThreads++;
-
- while (_stillRunning.load()) {
- _threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); });
-
- if (!_stillRunning.loadRelaxed()) {
- break;
- }
-
- if (_readyTasks.empty()) {
- continue;
- }
-
- auto task = std::move(_readyTasks.front());
- _readyTasks.pop_front();
- _numReadyThreads -= 1;
- bool launchReplacement = false;
- if (_numReadyThreads + _numStartingThreads < _reservedThreads) {
- _numStartingThreads++;
- launchReplacement = true;
- }
-
- lk.unlock();
-
- if (launchReplacement) {
- auto threadStartStatus = _startWorker();
- if (!threadStartStatus.isOK()) {
- LOGV2_WARNING(22981,
- "Could not start new reserve worker thread: {error}",
- "Could not start new reserve worker thread",
- "error"_attr = threadStartStatus);
- }
- }
-
- _localWorkQueue.emplace_back(std::move(task));
- while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) {
- _localWorkQueue.front()(Status::OK());
- _localWorkQueue.pop_front();
- }
-
- lk.lock();
- if (_numReadyThreads + 1 > _reservedThreads) {
- break;
- } else {
- _numReadyThreads += 1;
- }
- }
-
- LOGV2_DEBUG(22979,
- 3,
- "Exiting worker thread in {name} service executor",
- "Exiting worker thread in service executor",
- "name"_attr = _name);
- });
-}
-
-ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorReserved(ctx);
-
- // The ServiceExecutorReserved could be absent, so nullptr is okay.
- return ref.get();
-}
-
-Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
- LOGV2_DEBUG(22980, 3, "Shutting down reserved executor");
-
- stdx::unique_lock<Latch> lock(_mutex);
- _stillRunning.store(false);
- _threadWakeup.notify_all();
-
- bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() {
- return _numRunningWorkerThreads.load() == 0;
- });
-
- return result
- ? Status::OK()
- : Status(ErrorCodes::Error::ExceededTimeLimit,
- "reserved executor couldn't shutdown all worker threads within time limit.");
-}
-
-void ServiceExecutorReserved::_schedule(Task task) {
- if (!_stillRunning.load()) {
- task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
- return;
- }
-
- if (!_localWorkQueue.empty()) {
- _localWorkQueue.emplace_back(std::move(task));
- return;
- }
-
- stdx::lock_guard<Latch> lk(_mutex);
- _readyTasks.push_back(std::move(task));
- _threadWakeup.notify_one();
-}
-
-void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
- // The ServiceExecutorReserved loans a thread to one client for its lifetime and waits
- // synchronously on thread.
- struct Statlet {
- int threads;
- int total;
- int running;
- int waiting;
- };
-
- auto statlet = [&] {
- stdx::lock_guard lk(_mutex);
- auto threads = static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
- auto total = static_cast<int>(threads - _numReadyThreads - _numStartingThreads);
- auto running = total;
- auto waiting = 0;
- return Statlet{threads, total, running, waiting};
- }();
-
- BSONObjBuilder subbob = bob->subobjStart(kExecutorName);
- subbob.append(kThreadsRunning, statlet.threads);
- subbob.append(kClientsInTotal, statlet.total);
- subbob.append(kClientsRunning, statlet.running);
- subbob.append(kClientsWaiting, statlet.waiting);
-}
-
-/**
- * Schedules task immediately, on the assumption that The task will block to
- * receive the next message and we don't mind blocking on this dedicated
- * worker thread.
- */
-void ServiceExecutorReserved::_runOnDataAvailable(const SessionHandle& session, Task task) {
- invariant(session);
- _schedule([this, session, callback = std::move(task)](Status status) {
- yieldIfAppropriate();
- if (!status.isOK()) {
- callback(std::move(status));
- return;
- }
- callback(session->waitForData());
- });
-}
-
-auto ServiceExecutorReserved::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
- iassert(ErrorCodes::ShutdownInProgress, "Executor is not running", _stillRunning.load());
-
- /** Schedules on this. */
- class ForwardingTaskRunner : public TaskRunner {
- public:
- explicit ForwardingTaskRunner(ServiceExecutorReserved* e) : _e{e} {}
-
- void schedule(Task task) override {
- _e->_schedule(std::move(task));
- }
-
- void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override {
- _e->_runOnDataAvailable(std::move(session), std::move(task));
- }
-
- private:
- ServiceExecutorReserved* _e;
- };
- return std::make_unique<ForwardingTaskRunner>(this);
-}
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
deleted file mode 100644
index b9211b2f58b..00000000000
--- a/src/mongo/transport/service_executor_reserved.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <deque>
-
-#include "mongo/base/status.h"
-#include "mongo/db/service_context.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/transport/service_executor.h"
-
-namespace mongo {
-namespace transport {
-
-/**
- * The reserved service executor emulates a thread per connection.
- * Each connection has its own worker thread where jobs get scheduled.
- *
- * The executor will start reservedThreads on start, and create a new thread every time it
- * starts a new thread, ensuring there are always reservedThreads available for work - this
- * means that even when you hit the NPROC ulimit, there will still be threads ready to
- * accept work. When threads exit, they will go back to waiting for work if there are fewer
- * than reservedThreads available.
- */
-class ServiceExecutorReserved final : public ServiceExecutor {
-public:
- explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
-
- static ServiceExecutorReserved* get(ServiceContext* ctx);
-
- Status start() override;
- Status shutdown(Milliseconds timeout) override;
-
- size_t getRunningThreads() const override {
- return _numRunningWorkerThreads.loadRelaxed();
- }
-
- void appendStats(BSONObjBuilder* bob) const override;
-
- std::unique_ptr<TaskRunner> makeTaskRunner() override;
-
-private:
- Status _startWorker();
-
- void _schedule(Task task);
-
- void _runOnDataAvailable(const SessionHandle& session, Task task);
-
- static thread_local std::deque<Task> _localWorkQueue;
- static thread_local int64_t _localThreadIdleCounter;
-
- AtomicWord<bool> _stillRunning{false};
-
- mutable Mutex _mutex = MONGO_MAKE_LATCH("ServiceExecutorReserved::_mutex");
- stdx::condition_variable _threadWakeup;
- stdx::condition_variable _shutdownCondition;
-
- std::deque<Task> _readyTasks;
-
- AtomicWord<unsigned> _numRunningWorkerThreads{0};
- size_t _numReadyThreads{0};
- size_t _numStartingThreads{0};
-
- const std::string _name;
- const size_t _reservedThreads;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index 31b88b76eb6..6748c3aa229 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -29,195 +29,744 @@
#include "mongo/transport/service_executor_synchronous.h"
+#include <array>
+#include <deque>
+#include <fmt/format.h>
+
+#include "mongo/base/status.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_executor_utils.h"
+#include "mongo/transport/service_executor.h"
+#include "mongo/transport/service_executor_synchronous_synchronization.h"
+#include "mongo/transport/service_executor_synchronous_thread.h"
+#include "mongo/transport/session.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/functional.h"
+#include "mongo/util/processinfo.h"
+#include "mongo/util/scoped_unlock.h"
+#include "mongo/util/stacktrace.h"
+#include "mongo/util/thread_safety_context.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
namespace mongo::transport {
namespace {
-const auto getServiceExecutorSynchronous =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
-const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{
- "ServiceExecutorSynchronous", [](ServiceContext* ctx) {
- getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx);
- }};
-} // namespace
+using namespace fmt::literals;
-class ServiceExecutorSynchronous::SharedState : public std::enable_shared_from_this<SharedState> {
-private:
- class LockRef {
+// Central switch for debug instrumentation.
+constexpr int kLocalDbg = 5;
+
+Status notRunningStatus() {
+ return {ErrorCodes::ShutdownInProgress, "Executor is not running"};
+}
+
+size_t cores() {
+ static const auto numCores = ProcessInfo::getNumAvailableCores();
+ return numCores;
+}
+
+enum class BucketId {
+ none,
+ starting,
+ ready,
+ leased,
+ stopping,
+};
+
+StringData bucketName(BucketId b) {
+ static constexpr std::array names{
+ "none"_sd,
+ "starting"_sd,
+ "ready"_sd,
+ "leased"_sd,
+ "stopping"_sd,
+ };
+ return names[static_cast<size_t>(b)];
+}
+
+std::ostream& toString(std::ostream& os, BucketId b) {
+ return os << bucketName(b);
+}
+
+class ManagedBuckets {
+public:
+ class Member;
+ using Bucket = std::list<std::shared_ptr<Member>>;
+
+ /** Intrusive base class for objects that want to be in a `ManagedBucket`. */
+ class Member {
public:
- explicit LockRef(SharedState* p) : _p{p} {}
+ virtual ~Member() {
+ invariant(!bucket, "Destroyed while held by a bucket");
+ }
- size_t threads() const {
- return _p->_threads;
+ private:
+ friend class ManagedBuckets;
+ Bucket* bucket = nullptr;
+ typename Bucket::iterator iterator;
+ };
+
+ struct BucketSizes {
+ size_t starting;
+ size_t ready;
+ size_t leased;
+ size_t stopping;
+ };
+
+ class SyncAccess {
+ public:
+ explicit SyncAccess(const ManagedBuckets* sourcePtr) : _sourcePtr{sourcePtr} {}
+
+ BucketSizes bucketSizes() const {
+ return {_src()._starting.size(),
+ _src()._ready.size(),
+ _src()._leased.size(),
+ _src()._stopping.size()};
}
- bool waitForDrain(Milliseconds dur) {
- return _p->_cv.wait_for(_lk, dur.toSystemDuration(), [&] { return !_p->_threads; });
+ size_t totalSize() const {
+ auto bs = bucketSizes();
+ return bs.starting + bs.ready + bs.leased + bs.stopping;
}
- void onStartThread() {
- ++_p->_threads;
+ /** Move `w` from whatever bucket it's in now (if any) to the `toId` bucket. */
+ void move(const std::shared_ptr<Member>& w, BucketId toId) {
+ _moveLocked(w, toId);
}
- void onEndThread() {
- if (!--_p->_threads)
- _p->_cv.notify_all();
+ /** Move the back of `fromId` and into `toId`. */
+ std::shared_ptr<Member> popMove(BucketId fromId, BucketId toId) {
+ auto from = _src()._bucket(fromId);
+ if (from->empty())
+ return nullptr;
+ auto w = from->back();
+ _moveLocked(w, toId);
+ return w;
}
private:
- SharedState* _p;
- stdx::unique_lock<stdx::mutex> _lk{_p->_mutex}; // NOLINT
- };
+ void _moveLocked(const std::shared_ptr<Member>& w, BucketId destId) {
+ Bucket* destPtr = _src()._bucket(destId);
+ Member& bm = *w;
+ if (destPtr) {
+ if (bm.bucket) {
+ destPtr->splice(destPtr->end(), *bm.bucket, bm.iterator);
+ } else {
+ destPtr->push_back(w);
+ bm.iterator = std::prev(destPtr->end());
+ }
+ } else {
+ if (bm.bucket)
+ bm.bucket->erase(bm.iterator);
+ }
+ bm.bucket = destPtr;
+ }
-public:
- void schedule(Task task);
+ const ManagedBuckets& _src() const {
+ return *_sourcePtr;
+ }
- bool isRunning() const {
- return _isRunning.loadRelaxed();
- }
+ ManagedBuckets& _src() {
+ return *const_cast<ManagedBuckets*>(_sourcePtr);
+ }
+
+ const ManagedBuckets* _sourcePtr;
+ Synchronization::Lock _lk{_sourcePtr->_sync.acquireLock()};
+ };
- void setIsRunning(bool b) {
- _isRunning.store(b);
+ explicit ManagedBuckets(std::shared_ptr<SyncDomain> domain)
+ : _sync{"buckets", std::move(domain)} {}
+
+ std::string toString() const {
+ auto bs = sync().bucketSizes();
+ return "[starting={}/ready={}/leased={}/stopping={}]"_format(
+ bs.starting, bs.ready, bs.leased, bs.stopping);
}
- LockRef lock() {
- return LockRef{this};
+ SyncAccess sync() const {
+ return SyncAccess{this};
}
private:
- class WorkerThreadInfo;
+ Bucket* _bucket(BucketId id) {
+ switch (id) {
+ case BucketId::none:
+ return nullptr;
+ case BucketId::starting:
+ return &_starting;
+ case BucketId::ready:
+ return &_ready;
+ case BucketId::leased:
+ return &_leased;
+ case BucketId::stopping:
+ return &_stopping;
+ }
+ MONGO_UNREACHABLE;
+ }
- mutable stdx::mutex _mutex; // NOLINT
- stdx::condition_variable _cv;
- AtomicWord<bool> _isRunning;
- size_t _threads = 0;
+ mutable Synchronization _sync;
+ Bucket _starting;
+ Bucket _ready;
+ Bucket _leased;
+ Bucket _stopping;
};
-class ServiceExecutorSynchronous::SharedState::WorkerThreadInfo {
+class Worker : public std::enable_shared_from_this<Worker>, public ManagedBuckets::Member {
public:
- explicit WorkerThreadInfo(std::shared_ptr<SharedState> sharedState)
- : sharedState{std::move(sharedState)} {}
+ class LeaseToken;
+
+ class Env {
+ public:
+ virtual ~Env() = default;
+
+ virtual StringData name() const = 0;
+
+ virtual bool isRunning() const = 0;
+
+ virtual size_t getRunningThreads() const = 0;
+
+ /** May throw ShutdownInProgress. */
+ virtual void onReadyThread(const std::shared_ptr<Worker>& w) = 0;
+
+ /** May throw ShutdownInProgress. */
+ virtual void onReleasedThread(const std::shared_ptr<Worker>& w) = 0;
+
+ virtual void onEndThread(const std::shared_ptr<Worker>& w) = 0;
+
+ virtual void yield() const = 0;
+
+ /**
+ * The worker's Synchronization object will participate in an
+ * env-specified SyncDomain if there is one, or it can be nullptr.
+ */
+ virtual std::shared_ptr<SyncDomain> syncDomain() = 0;
+ };
+
+ Worker(std::unique_ptr<Env> env, uint64_t id)
+ : _env{std::move(env)}, _id{id}, _sync{"worker{}"_format(_id), _env->syncDomain()} {
+ LOGV2_DEBUG(7015101, kLocalDbg, "Worker ctor", "w"_attr = _id);
+ }
+
+ ~Worker() {
+ LOGV2_DEBUG(7015102, kLocalDbg, "Worker dtor", "w"_attr = _id);
+ }
+
+ /**
+ * The worker's executor. It pins a reference to a Worker thread on which it
+ * can schedule tasks. There is one per Worker, and when it dies, it moves its
+ * worker thread into the executor's ready list.
+ * From there, the executor can destroy it or lease it out again as needed.
+ */
+ std::unique_ptr<LeaseToken> makeLease();
+
+ void shutdown() {
+ auto lk = _sync.acquireLock();
+ shutdownLocked();
+ }
+
+ void shutdownLocked() {
+ _state = State::stopped;
+ _sync.notifyAll();
+ }
void run() {
- while (!queue.empty() && sharedState->isRunning()) {
- queue.front()(Status::OK());
- queue.pop_front();
+ ScopeGuard epilogue = [&] { _env->onEndThread(shared_from_this()); };
+ try {
+ _env->onReadyThread(shared_from_this());
+ } catch (const ExceptionFor<ErrorCodes::ShutdownInProgress>&) {
+ shutdown();
}
+ auto lk = _sync.acquireLock();
+ while (_state != State::stopped) {
+ _sync.wait(lk, [&] { return _state != State::idle; });
+ if (_state == State::leased) {
+ _serveLease(lk);
+ _endLeaseLocked();
+ }
+ }
+ _serveLease(lk, notRunningStatus());
+ }
+
+ uint64_t getId() const {
+ return _id;
}
- std::shared_ptr<SharedState> sharedState;
- std::deque<Task> queue;
+private:
+ enum class State {
+ idle,
+ leased,
+ stopped,
+ };
+
+ friend StringData toString(State s) {
+ return std::array{
+ "idle"_sd,
+ "leased"_sd,
+ "stopped"_sd,
+ }[static_cast<int>(s)];
+ }
+
+ void _endLeaseLocked() {
+ if (_state != State::leased)
+ return;
+ _state = State::idle;
+ try {
+ _env->onReleasedThread(shared_from_this());
+ } catch (const ExceptionFor<ErrorCodes::ShutdownInProgress>&) {
+ _state = State::stopped;
+ }
+ _sync.notifyAll();
+ }
+
+ void _onDestroyLeaseToken() {
+ auto lk = _sync.acquireLock();
+ _endLeaseLocked();
+ }
+
+ void _schedule(ServiceExecutor::Task task) {
+ LOGV2_DEBUG(7015106, kLocalDbg, "Schedule", "w"_attr = getId());
+ if (MONGO_unlikely(!_env->isRunning())) {
+ task(notRunningStatus());
+ return;
+ }
+ auto lk = _sync.acquireLock();
+ _tasks.push_back(std::move(task));
+ _sync.notifyOne();
+ }
+
+ /**
+ * Schedules task immediately, on the assumption that The task will block to
+ * receive the next message and we don't mind blocking on this dedicated
+ * worker thread.
+ */
+ void _runOnDataAvailable(const std::shared_ptr<Session>& session, ServiceExecutor::Task task) {
+ invariant(session);
+ _schedule(std::move(task));
+ }
+
+ /**
+ * Runs for the duration of a worker lease, calling queued tasks with the
+ * specified `execStatus`, watching for state changes. If this worker
+ * becomes un-leased, any queued tasks are still invoked, and the function
+ * returns.
+ */
+ void _serveLease(Synchronization::Lock& lk, Status execStatus = Status::OK()) {
+ while (true) {
+ _sync.wait(lk, [&] { return _state != State::leased || !_tasks.empty(); });
+ if (_tasks.empty())
+ return;
+ auto t = std::move(_tasks.front());
+ _tasks.pop_front();
+ ScopedUnlock unlock{lk};
+ LOGV2_DEBUG(
+ 7015100, kLocalDbg, "Run task", "w"_attr = getId(), "executor"_attr = _env->name());
+ t(execStatus);
+ t = nullptr;
+ }
+ }
+
+ void _yield() const {
+ _env->yield();
+ }
+
+ const std::unique_ptr<Env> _env;
+ const uint64_t _id;
+ Synchronization _sync;
+ std::deque<ServiceExecutor::Task> _tasks;
+ State _state = State::idle;
};
-void ServiceExecutorSynchronous::SharedState::schedule(Task task) {
- if (!isRunning()) {
- task(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
- return;
+class Worker::LeaseToken : public ServiceExecutor::Executor {
+public:
+ explicit LeaseToken(std::shared_ptr<Worker> worker) : _w{std::move(worker)} {
+ LOGV2_DEBUG(7015107, kLocalDbg, "LeaseToken ctor", "w"_attr = _w->getId());
}
- thread_local WorkerThreadInfo* workerThreadInfoTls = nullptr;
+ ~LeaseToken() {
+ LOGV2_DEBUG(7015108, kLocalDbg, "LeaseToken dtor", "w"_attr = _w->getId());
+ _w->_onDestroyLeaseToken();
+ }
- if (workerThreadInfoTls) {
- workerThreadInfoTls->queue.push_back(std::move(task));
- return;
+ void schedule(Task task) override {
+ _w->_schedule(std::move(task));
}
- LOGV2_DEBUG(22983, 3, "Starting ServiceExecutorSynchronous worker thread");
- auto workerInfo = std::make_unique<WorkerThreadInfo>(shared_from_this());
- workerInfo->queue.push_back(std::move(task));
+ void runOnDataAvailable(const std::shared_ptr<Session>& session, Task task) override {
+ _w->_runOnDataAvailable(session, std::move(task));
+ }
- Status status = launchServiceWorkerThread([w = std::move(workerInfo)] {
- w->sharedState->lock().onStartThread();
- ScopeGuard onEndThreadGuard = [&] { w->sharedState->lock().onEndThread(); };
+ void yieldPointReached() const override {
+ _w->_yield();
+ }
- workerThreadInfoTls = &*w;
- ScopeGuard resetTlsGuard = [&] { workerThreadInfoTls = nullptr; };
+private:
+ std::shared_ptr<Worker> _w;
+};
- w->run();
- });
- // The usual way to fail to schedule is to invoke the task, but in this case
- // we don't have the task anymore. We gave it away to the callback that the
- // failed thread was supposed to run.
- iassert(status);
+std::unique_ptr<Worker::LeaseToken> Worker::makeLease() {
+ auto lk = _sync.acquireLock();
+ if (_state != State::idle)
+ return nullptr;
+ _state = State::leased;
+ return std::make_unique<LeaseToken>(shared_from_this());
}
-ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext*)
- : _sharedState{std::make_shared<SharedState>()} {}
+class DedicatedThreadExecutorPool
+ : public std::enable_shared_from_this<DedicatedThreadExecutorPool> {
+private:
+ size_t _availableThreads(const ManagedBuckets::BucketSizes& bs) const {
+ return bs.starting + bs.ready;
+ }
-ServiceExecutorSynchronous::~ServiceExecutorSynchronous() = default;
+ /** If we have less than reserve, we need more. */
+ bool _needsMoreWorkers(Synchronization::Lock& lk, const ManagedBuckets::BucketSizes& bs) const {
+ return _availableThreads(bs) < _waitingForLease + _reservedThreads;
+ }
-Status ServiceExecutorSynchronous::start() {
- _sharedState->setIsRunning(true);
- return Status::OK();
-}
+ /** We keep a few threads alive to reduce spawning. */
+ bool _needsFewerWorkers(Synchronization::Lock& lk,
+ const ManagedBuckets::BucketSizes& bs) const {
+ return _availableThreads(bs) >= _waitingForLease + _reservedThreads + _maxIdleThreads;
+ }
-Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
- LOGV2_DEBUG(22982, 3, "Shutting down passthrough executor");
- auto stopLock = _sharedState->lock();
- _sharedState->setIsRunning(false);
- if (!stopLock.waitForDrain(timeout))
- return Status(ErrorCodes::Error::ExceededTimeLimit,
- "passthrough executor couldn't shutdown "
- "all worker threads within time limit.");
- return Status::OK();
-}
+ /** Called when a new worker has started, becoming ready. */
+ void _onReadyThread(const std::shared_ptr<Worker>& w) {
+ LOGV2_DEBUG(7015109, kLocalDbg, "A new Worker has become ready", "w"_attr = w->getId());
+ auto lk = _sync.acquireLock();
+ if (!_isRunning.loadRelaxed())
+ iasserted(notRunningStatus());
+ _workers.sync().move(_asMember(w), BucketId::ready);
+ _sync.notifyAll();
+ }
-ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorSynchronous(ctx);
- invariant(ref);
- return ref.get();
+ /**
+ * Worker `w` has been released from a lease.
+ * A decision is made here about whether to keep the worker or discard it.
+ */
+ void _onReleasedThread(const std::shared_ptr<Worker>& w) {
+ LOGV2_DEBUG(7015110, kLocalDbg, "Worker released from lease", "w"_attr = w->getId());
+ auto lk = _sync.acquireLock();
+ if (!_isRunning.loadRelaxed())
+ iasserted(notRunningStatus());
+ auto workersSync = _workers.sync();
+ auto bs = workersSync.bucketSizes();
+ if (_needsFewerWorkers(lk, bs)) {
+ w->shutdownLocked();
+ workersSync.move(w, BucketId::none);
+ _updateLazyShouldYield(workersSync.bucketSizes());
+ } else {
+ workersSync.move(w, BucketId::ready);
+ }
+ _sync.notifyAll();
+ }
+
+ /** Remove a dying worker from the buckets as its run() concludes. */
+ void _onEndThread(const std::shared_ptr<Worker>& w) {
+ LOGV2_DEBUG(7015111, kLocalDbg, "Worker end", "w"_attr = w->getId());
+ auto lk = _sync.acquireLock();
+ _workers.sync().move(w, BucketId::none);
+ _sync.notifyAll();
+ }
+
+public:
+ explicit DedicatedThreadExecutorPool(std::string name,
+ size_t reservedThreads,
+ size_t maxIdleThreads,
+ std::string statLabel)
+ : _name{std::move(name)},
+ _reservedThreads{reservedThreads},
+ _maxIdleThreads{maxIdleThreads},
+ _statLabel{std::move(statLabel)},
+ _workers{_sync.domain()} {}
+
+ Status start() try {
+ auto lk = _sync.acquireLock();
+ _isRunning.store(true);
+ _sync.notifyAll();
+ _spawnEnoughWorkers(lk);
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+
+ size_t getRunningThreads() const {
+ return _workers.sync().totalSize();
+ }
+
+ void appendStats(BSONObjBuilder* bob) const {
+ auto bs = _workers.sync().bucketSizes();
+ int threads = bs.starting + bs.ready + bs.leased + bs.stopping;
+ int clients = bs.leased;
+ BSONObjBuilder{bob->subobjStart(_statLabel)}
+ .append("threadsRunning", threads)
+ .append("clientsInTotal", clients)
+ .append("clientsRunning", clients)
+ .append("clientsWaitingForData", 0);
+ }
+
+ /** Lease a worker, spawning new workers as necessary. */
+ std::unique_ptr<ServiceExecutor::Executor> makeTaskRunner() {
+ LOGV2_DEBUG(7015112, kLocalDbg, "makeTaskRunner");
+ auto lk = _sync.acquireLock();
+ ++_waitingForLease;
+ ScopeGuard restoreWaitingForLease = [&] { --_waitingForLease; };
+ while (MONGO_likely(_isRunning.loadRelaxed())) {
+ try {
+ _spawnEnoughWorkers(lk);
+ } catch (const DBException& ex) {
+ LOGV2_DEBUG(7015103, 3, "Nonfatal spawn failure", "error"_attr = ex);
+ }
+
+ auto [w, bs] = _nextWorker(lk);
+ if (!w)
+ continue;
+ _updateLazyShouldYield(bs);
+
+ // `w->makeLease` locks `w`, so release subordinate mutex.
+ ScopedUnlock unlockThis{lk};
+ if (auto lease = w->makeLease())
+ return lease;
+ // Retry. Lease failed as w is no longer idle.
+ // Another requester must have gotten it first.
+ }
+ iasserted(notRunningStatus());
+ }
+
+ bool isRunning() const {
+ return _isRunning.loadRelaxed();
+ }
+
+ Status shutdown(Milliseconds timeout) {
+ LOGV2_DEBUG(22982, 3, "Shutting down executor", "name"_attr = _name);
+ auto lk = _sync.acquireLock();
+ _isRunning.store(false);
+ {
+ ScopedUnlock unlock{lk};
+ for (auto b : {BucketId::starting, BucketId::ready, BucketId::leased})
+ while (auto w = _workers.sync().popMove(b, BucketId::stopping))
+ _asWorker(w)->shutdown();
+ }
+ _sync.notifyAll();
+ if (!_sync.waitFor(lk, timeout, [&] {
+ LOGV2_DEBUG(7015113, kLocalDbg, "waitForDrain");
+ return _workers.sync().totalSize() == 0;
+ })) {
+ return Status(
+ ErrorCodes::Error::ExceededTimeLimit,
+ "Executor {} couldn't shutdown within {}."_format(_name, timeout.toString()));
+ }
+ return Status::OK();
+ }
+
+private:
+ static std::shared_ptr<ManagedBuckets::Member> _asMember(std::shared_ptr<Worker> w) {
+ return std::dynamic_pointer_cast<ManagedBuckets::Member>(std::move(w));
+ }
+ static std::shared_ptr<Worker> _asWorker(std::shared_ptr<ManagedBuckets::Member> w) {
+ return std::dynamic_pointer_cast<Worker>(std::move(w));
+ }
+
+ /** The environment that `_makeWorker` imbues into its products. */
+ class WorkerEnv : public Worker::Env {
+ public:
+ explicit WorkerEnv(std::shared_ptr<DedicatedThreadExecutorPool> source)
+ : _source{std::move(source)} {}
+ StringData name() const override {
+ return _source->_name;
+ }
+ bool isRunning() const override {
+ return _source->isRunning();
+ }
+ size_t getRunningThreads() const override {
+ return _source->getRunningThreads();
+ }
+ void onReadyThread(const std::shared_ptr<Worker>& w) override {
+ _source->_onReadyThread(w);
+ }
+ void onReleasedThread(const std::shared_ptr<Worker>& w) override {
+ _source->_onReleasedThread(w);
+ }
+ void onEndThread(const std::shared_ptr<Worker>& w) override {
+ _source->_onEndThread(w);
+ }
+ std::shared_ptr<SyncDomain> syncDomain() override {
+ return _source->_sync.domain();
+ }
+ void yield() const override {
+ return _source->_yield();
+ }
+
+ private:
+ std::shared_ptr<DedicatedThreadExecutorPool> _source;
+ };
+
+ /**
+ * Only the leased workers are competing for cores.
+ * This metric has never accounted for other threads competing with this
+ * executor's threads. Even among the ServiceExecutors, Synchronous and
+ * Reserved can be simultanenously active.
+ */
+ void _updateLazyShouldYield(ManagedBuckets::BucketSizes bs) {
+ bool newValue = bs.leased > cores();
+ if (_lazyShouldYield.loadRelaxed() != newValue)
+ _lazyShouldYield.store(newValue);
+ }
+
+ void _yield() const {
+ if (_lazyShouldYield.loadRelaxed())
+ stdx::this_thread::yield();
+ }
+
+ void _spawnEnoughWorkers(Synchronization::Lock& lk) {
+ while (_needsMoreWorkers(lk, _workers.sync().bucketSizes())) {
+ LOGV2_DEBUG(7015114,
+ 3,
+ "Starting a new thread",
+ "executor"_attr = _name,
+ "waitingForLease"_attr = _waitingForLease,
+ "buckets"_attr = _workers.toString());
+ ScopedUnlock unlock{lk};
+ auto w = _makeWorker();
+ _workers.sync().move(w, BucketId::starting);
+ try {
+ iassert(launchServiceWorkerThread([w]() mutable { std::move(w)->run(); }));
+ } catch (...) {
+ _workers.sync().move(w, BucketId::none);
+ throw;
+ }
+ }
+ }
+
+ std::tuple<std::shared_ptr<Worker>, ManagedBuckets::BucketSizes> _nextWorker(
+ Synchronization::Lock& lk) {
+ for (;;) {
+ _sync.wait(lk, [&] {
+ auto bs = _workers.sync().bucketSizes();
+ return !_isRunning.loadRelaxed() || bs.ready || !bs.starting;
+ });
+ if (!_isRunning.loadRelaxed())
+ iasserted(notRunningStatus());
+ auto workersSync = _workers.sync();
+ auto bs = workersSync.bucketSizes();
+ if (bs.ready)
+ return {_asWorker(workersSync.popMove(BucketId::ready, BucketId::leased)), bs};
+ if (!bs.starting)
+ iasserted({ErrorCodes::InternalError, "No ready workers"});
+ }
+ }
+
+ std::shared_ptr<Worker> _makeWorker() {
+ return std::make_shared<Worker>(std::make_unique<WorkerEnv>(shared_from_this()),
+ _workerIdCounter.fetchAndAdd(1));
+ }
+
+ const std::string _name;
+ const size_t _reservedThreads;
+ const size_t _maxIdleThreads;
+ const std::string _statLabel;
+ AtomicWord<uint64_t> _workerIdCounter{1};
+ mutable Synchronization _sync{"executor"}; ///< Root of its domain
+ AtomicWord<bool> _isRunning;
+ size_t _waitingForLease = 0;
+ ManagedBuckets _workers;
+
+ /** Updated in a relaxed manner when a Worker lease is created or destroyed. */
+ AtomicWord<bool> _lazyShouldYield;
+};
+
+} // namespace
+
+// =======================
+// ServiceExecutorSyncBase
+// =======================
+
+class ServiceExecutorSyncBase::Impl : public DedicatedThreadExecutorPool {
+public:
+ using DedicatedThreadExecutorPool::DedicatedThreadExecutorPool;
+};
+
+ServiceExecutorSyncBase::ServiceExecutorSyncBase(std::string name,
+ size_t reservedThreads,
+ size_t maxIdleThreads,
+ std::string statLabel)
+ : _impl{std::make_shared<Impl>(
+ std::move(name), reservedThreads, maxIdleThreads, std::move(statLabel))} {}
+
+ServiceExecutorSyncBase::~ServiceExecutorSyncBase() = default;
+
+Status ServiceExecutorSyncBase::start() {
+ return _impl->start();
}
-void ServiceExecutorSynchronous::_schedule(Task task) {
- _sharedState->schedule(std::move(task));
+Status ServiceExecutorSyncBase::shutdown(Milliseconds timeout) {
+ return _impl->shutdown(timeout);
}
-size_t ServiceExecutorSynchronous::getRunningThreads() const {
- return _sharedState->lock().threads();
+size_t ServiceExecutorSyncBase::getRunningThreads() const {
+ return _impl->getRunningThreads();
}
-void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
- // Has one client per thread and waits synchronously on that thread.
- int threads = getRunningThreads();
- BSONObjBuilder{bob->subobjStart("passthrough")}
- .append("threadsRunning", threads)
- .append("clientsInTotal", threads)
- .append("clientsRunning", threads)
- .append("clientsWaitingForData", 0);
+void ServiceExecutorSyncBase::appendStats(BSONObjBuilder* bob) const {
+ _impl->appendStats(bob);
}
-void ServiceExecutorSynchronous::_runOnDataAvailable(const SessionHandle& session, Task task) {
- invariant(session);
- yieldIfAppropriate();
- _schedule(std::move(task));
+std::unique_ptr<ServiceExecutor::Executor> ServiceExecutorSyncBase::makeTaskRunner() {
+ return _impl->makeTaskRunner();
}
-auto ServiceExecutorSynchronous::makeTaskRunner() -> std::unique_ptr<TaskRunner> {
- if (!_sharedState->isRunning())
- iassert(Status(ErrorCodes::ShutdownInProgress, "Executor is not running"));
+// ==========================
+// ServiceExecutorSynchronous
+// ==========================
- /** Schedules on this. */
- class ForwardingTaskRunner : public TaskRunner {
- public:
- explicit ForwardingTaskRunner(ServiceExecutorSynchronous* e) : _e{e} {}
+namespace {
+const auto serviceExecutorSynchronousDecoration =
+ ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
- void schedule(Task task) override {
- _e->_schedule(std::move(task));
- }
+const ServiceContext::ConstructorActionRegisterer serviceExecutorSynchronousRegisterer{
+ "ServiceExecutorSynchronous", [](ServiceContext* ctx) {
+ serviceExecutorSynchronousDecoration(ctx) = std::make_unique<ServiceExecutorSynchronous>();
+ }};
+} // namespace
- void runOnDataAvailable(std::shared_ptr<Session> session, Task task) override {
- _e->_runOnDataAvailable(std::move(session), std::move(task));
- }
+ServiceExecutorSynchronous::ServiceExecutorSynchronous()
+ : ServiceExecutorSyncBase{"passthrough", defaultReserved, cores(), "passthrough"} {}
- private:
- ServiceExecutorSynchronous* _e;
- };
- return std::make_unique<ForwardingTaskRunner>(this);
+ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
+ auto& ref = serviceExecutorSynchronousDecoration(ctx);
+ invariant(ref);
+ return ref.get();
+}
+
+// =======================
+// ServiceExecutorReserved
+// =======================
+
+namespace {
+const auto serviceExecutorReservedDecoration =
+ ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>();
+
+const ServiceContext::ConstructorActionRegisterer serviceExecutorReservedRegisterer{
+ "ServiceExecutorReserved", [](ServiceContext* ctx) {
+ auto threads = serverGlobalParams.reservedAdminThreads;
+ if (threads)
+ serviceExecutorReservedDecoration(ctx) =
+ std::make_unique<ServiceExecutorReserved>("admin connections", threads, cores());
+ }};
+} // namespace
+
+ServiceExecutorReserved::ServiceExecutorReserved(std::string name,
+ size_t reservedThreads,
+ size_t maxIdleThreads)
+ : ServiceExecutorSyncBase{std::move(name), reservedThreads, maxIdleThreads, "reserved"} {}
+
+ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) {
+ return serviceExecutorReservedDecoration(ctx).get();
}
} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 89a31492be2..7ada4a0b0a7 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -29,69 +29,109 @@
#pragma once
-#include <deque>
+#include <memory>
+#include <string>
#include "mongo/base/status.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/service_context.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/condition_variable.h"
#include "mongo/transport/service_executor.h"
-#include "mongo/util/hierarchical_acquisition.h"
+#include "mongo/util/duration.h"
-namespace mongo {
-namespace transport {
+namespace mongo::transport {
-/** Transitional for differential benchmarking of ServiceExecutorSynchronous refactor */
-#define TRANSITIONAL_SERVICE_EXECUTOR_SYNCHRONOUS_HAS_RESERVE 0
+/**
+ * Transitional for differential benchmarking of ServiceExecutorSynchronous refactor.
+ * Can be removed when it's no longer necessary to benchmark the refactor.
+ * Picked up by service_executor_bm.cpp to detect this newer API.
+ */
+#define TRANSITIONAL_SERVICE_EXECUTOR_SYNCHRONOUS_HAS_RESERVE 1
/**
- * Creates a fresh worker thread for each top-level scheduled task. Any tasks
- * scheduled during the execution of that top-level task as it runs on such a
- * worker thread are pushed to the queue of that worker thread.
+ * ServiceExecutorSynchronous is just a special case of ServiceExecutorReserved,
+ * Which just happens to have `reservedTheads == 0`. Both are implemented by
+ * this base class.
*
- * Thus, the top-level task is expected to represent a chain of operations, each
- * of which schedules its successor before returning. The entire chain of
- * operations, and nothing else, executes on the same worker thread.
+ * It will start `reservedThreads` on start, and create enough new threads every
+ * time it gives out a lease, that there are at least `reservedThreads`
+ * available for work (as long as spawning is possible). This means that even
+ * when we hit the NPROC ulimit, and spawning is temporarily failing, there will
+ * still be threads ready to accept work.
+ *
+ * When a lease is released, the worker will go back to waiting for work if
+ * there are no more than `reservedThreads + maxIdleThreads` available.
+ * Otherwise it will be destroyed. This means that `reservedThreads` are for
+ * emergency use only, while the `maxIdleThreads` specifies the strength of an
+ * optimization that minimizes the churn of worker threads. With
+ * `maxIdleThreads==0`, every worker lease would spawn a new thread, which is
+ * very wasteful.
*/
-class ServiceExecutorSynchronous final : public ServiceExecutor {
+class ServiceExecutorSyncBase : public ServiceExecutor {
public:
- /** Returns the ServiceExecutorSynchronous decoration on `ctx`. */
- static ServiceExecutorSynchronous* get(ServiceContext* ctx);
-
- explicit ServiceExecutorSynchronous(ServiceContext*);
-
- ~ServiceExecutorSynchronous();
+ ~ServiceExecutorSyncBase() override;
Status start() override;
Status shutdown(Milliseconds timeout) override;
- std::unique_ptr<TaskRunner> makeTaskRunner() override;
+ std::unique_ptr<Executor> makeTaskRunner() override;
size_t getRunningThreads() const override;
void appendStats(BSONObjBuilder* bob) const override;
-private:
- class SharedState;
-
+protected:
/**
- * The behavior of `schedule` depends on whether the calling thread is a
- * worker thread spawned by a previous `schedule` call.
- *
- * If a nonworker thread schedules a task, a worker thread is spawned, and
- * the task is transferred to the new worker thread's queue.
+ * `reservedThreads` - At `start` time and at `makeTaskRunner` time, workers
+ * will be spawned until this number of available threads is reached. This
+ * is intended as a minimum bulwark against periods during which workers
+ * cannot spawn.
*
- * If a worker thread schedules a task, the task is pushed to the back of its
- * queue. The worker thread exits when the queue becomes empty.
+ * `maxIdleThreads` - When a worker lease is released, the corresponding
+ * worker can be destroyed, or it can be kept for use by the next lease.
+ * This parameter specifies the upper limit on the number of such kept
+ * threads, not counting reserved threads.
*/
- void _schedule(Task task);
+ ServiceExecutorSyncBase(std::string name,
+ size_t reservedThreads,
+ size_t maxIdleThreads,
+ std::string statLabel);
+
+private:
+ class Impl;
+
+ std::shared_ptr<Impl> _impl;
+};
- void _runOnDataAvailable(const SessionHandle& session, Task onCompletionCallback);
+/**
+ * Provides access to a pool of dedicated worker threads via `makeTaskRunner`,
+ * which retuns a worker lease. A worker lease is a handle that acts as an
+ * executor, accepting tasks. The leased worker thread is dedicated, meaning it
+ * will only work through the lease handle: all tasks scheduled on that
+ * lease will be run on that worker, and no other tasks.
+ *
+ * A worker thread may be reused and leased many times, but only serially so.
+ */
+class ServiceExecutorSynchronous : public ServiceExecutorSyncBase {
+public:
+ static inline size_t defaultReserved = 0; /** For testing */
+ /** Returns the ServiceExecutorSynchronous decoration on `ctx`. */
+ static ServiceExecutorSynchronous* get(ServiceContext* ctx);
+
+ ServiceExecutorSynchronous();
+};
+
+/**
+ * Same as ServiceExecutorSynchronous (implemented by the same base class), but
+ * ServiceExecutorReserved has the additional property of having configurable
+ * `reservedThreads` and `maxIdleThreads` counts. See base class docs.
+ */
+class ServiceExecutorReserved : public ServiceExecutorSyncBase {
+public:
+ /** Null result is possible, as ServiceExecutorReserved could be absent. */
+ static ServiceExecutorReserved* get(ServiceContext* ctx);
- std::shared_ptr<SharedState> _sharedState;
+ ServiceExecutorReserved(std::string name, size_t reservedThreads, size_t maxIdleThreads);
};
-} // namespace transport
-} // namespace mongo
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_synchronous_synchronization.cpp b/src/mongo/transport/service_executor_synchronous_synchronization.cpp
new file mode 100644
index 00000000000..6fd0207f7ac
--- /dev/null
+++ b/src/mongo/transport/service_executor_synchronous_synchronization.cpp
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/transport/service_executor_synchronous_synchronization.h"
+
+#include "mongo/logv2/log.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
+
+namespace mongo::transport {
+
+void SyncDomain::_findCycle(std::vector<const ThreadRecord*>& seekers) const {
+ auto&& rec = *seekers.back();
+ if (!rec.wants)
+ return; // Tail is not seeking anything.
+ // See if any seeker in the chain, so far, holds what I want.
+ for (auto&& [_, hRec] : _threads) {
+ auto&& h = hRec.holds;
+ if (std::find_if(h.begin(), h.end(), [&](auto&& el) { return el.name == *rec.wants; }) ==
+ h.end())
+ continue;
+ // A node is holding what the tail of the chain wants.
+ if (std::find(seekers.begin(), seekers.end(), &hRec) != seekers.end()) {
+ // ... and that node is on the seeker chain.
+ LOGV2_FATAL(7015116, "Deadlock", "cycle"_attr = [&] {
+ BSONArrayBuilder cycle;
+ for (auto&& seeker : seekers) {
+ BSONObjBuilder b{cycle.subobjStart()};
+ b.append("thread", seeker->threadName);
+ b.append("wants", *seeker->wants);
+ BSONArrayBuilder holdsArr{b.subarrayStart("holds")};
+ for (auto&& h : seeker->holds)
+ BSONObjBuilder{holdsArr.subobjStart()}
+ .append("name", h.name)
+ .append("bt", h.backtrace);
+ }
+ return cycle.obj();
+ }());
+ }
+ seekers.push_back(&hRec);
+ _findCycle(seekers);
+ seekers.pop_back();
+ }
+}
+
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_synchronous_synchronization.h b/src/mongo/transport/service_executor_synchronous_synchronization.h
new file mode 100644
index 00000000000..af6885a1454
--- /dev/null
+++ b/src/mongo/transport/service_executor_synchronous_synchronization.h
@@ -0,0 +1,283 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <algorithm>
+#include <boost/optional.hpp>
+#include <fmt/format.h>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/logv2/log.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/stacktrace.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
+
+// Augment synchronization operations with verbose logging, deadlock detection, backtraces?
+#ifndef MONGO_SERVICE_EXECUTOR_SYNCHRONOUS_LOCK_DEBUGGING_ENABLE
+#define MONGO_SERVICE_EXECUTOR_SYNCHRONOUS_LOCK_DEBUGGING_ENABLE 0
+#endif
+
+namespace mongo::transport {
+
+/**
+ * All Synchronization objects that interact with each other will share a SyncDomain.
+ * This is a small book keeping structure that has an overall awareness of
+ * the state of all objects, to instrument their interactions.
+ * Its features are only used in instrumented builds.
+ */
+class SyncDomain {
+public:
+ SyncDomain() = default;
+
+ void afterLock(std::string name) {
+ stdx::unique_lock lk{_mutex};
+ auto& rec = _threads[_tid()];
+ rec.holds.push_back(ThreadRecord::Hold{*std::exchange(rec.wants, {}), _trace()});
+ }
+
+ void onUnlock(const std::string& name) {
+ stdx::unique_lock lk{_mutex};
+ auto iter = _threads.find(_tid());
+ invariant(iter != _threads.end());
+ auto&& holds = iter->second.holds;
+ auto holdIter =
+ std::find_if(holds.begin(), holds.end(), [&](auto&& el) { return el.name == name; });
+ invariant(holdIter != holds.end());
+ holds.erase(holdIter);
+ }
+
+ void beforeLock(std::string name) {
+ stdx::unique_lock lk{_mutex};
+ auto iter = _threads.find(_tid());
+ if (iter == _threads.end())
+ iter =
+ _threads.insert({_tid(), ThreadRecord{_tid(), std::string{getThreadName()}}}).first;
+ auto& myRec = iter->second;
+ myRec.wants = std::move(name);
+ std::vector<const ThreadRecord*> seekerChain{&myRec};
+ _findCycle(seekerChain);
+ }
+
+ BSONObj alsoHolding() const {
+ stdx::unique_lock lk{_mutex};
+ auto iter = _threads.find(_tid());
+ if (iter == _threads.end())
+ return {};
+ return iter->second.toBSON();
+ }
+
+private:
+ struct ThreadRecord {
+ struct Hold {
+ explicit Hold(std::string name, std::string backtrace)
+ : name{std::move(name)}, backtrace{std::move(backtrace)} {}
+ std::string name;
+ std::string backtrace;
+ };
+ stdx::thread::id tid;
+ std::string threadName;
+ boost::optional<std::string> wants;
+ std::vector<Hold> holds;
+
+ BSONObj toBSON() const {
+ BSONObjBuilder b;
+ b.append("threadName", threadName);
+ b.append("wants", wants.value_or(""));
+ {
+ BSONArrayBuilder holdsArr{b.subarrayStart("holds")};
+ for (auto&& h : holds)
+ holdsArr.append(h.name);
+ }
+ return b.obj();
+ }
+ };
+
+ std::string _trace() const {
+ std::string backtrace;
+ StringStackTraceSink traceSink{backtrace};
+ printStackTrace(traceSink);
+ return backtrace;
+ }
+
+ void _findCycle(std::vector<const ThreadRecord*>& seekers) const;
+
+ stdx::thread::id _tid() const {
+ return stdx::this_thread::get_id();
+ }
+
+ mutable stdx::mutex _mutex; // NOLINT
+ std::map<stdx::thread::id, ThreadRecord> _threads;
+};
+
+/**
+ * Abstraction to instrument synchronization behavior. This is basically a
+ * mutex and an associated condition variable, but can be built with lots of
+ * debug instrumentation.
+ */
+class Synchronization {
+private:
+ static constexpr bool _instrumented = MONGO_SERVICE_EXECUTOR_SYNCHRONOUS_LOCK_DEBUGGING_ENABLE;
+
+ class InstrumentedMutex {
+ private:
+ struct LockRecord {
+ stdx::thread::id id;
+ std::string name;
+ };
+
+ class HeldBy {
+ public:
+ void set() {
+ stdx::lock_guard lk{_mutex};
+ _lockRecord = LockRecord{stdx::this_thread::get_id(), std::string{getThreadName()}};
+ }
+
+ void reset() {
+ stdx::lock_guard lk{_mutex};
+ _lockRecord.reset();
+ }
+
+ boost::optional<LockRecord> get() const {
+ stdx::lock_guard lk{_mutex};
+ return _lockRecord;
+ }
+
+ std::string toString() const {
+ stdx::lock_guard lk{_mutex};
+ if (_lockRecord)
+ return _lockRecord->name;
+ return {};
+ }
+
+ private:
+ mutable stdx::mutex _mutex; // NOLINT
+ boost::optional<LockRecord> _lockRecord;
+ };
+
+ public:
+ explicit InstrumentedMutex(Synchronization* source) : _source{source} {}
+
+ void lock() {
+ HeldBy& h = _heldBy;
+ _source->_domain->beforeLock(_source->_name);
+ _mutex.lock();
+ _source->_domain->afterLock(_source->_name);
+ h.set();
+ }
+
+ void unlock() {
+ _mutex.unlock();
+ _source->_domain->onUnlock(_source->_name);
+ _heldBy.reset();
+ }
+
+ private:
+ Synchronization* _source;
+ Mutex _mutex;
+ HeldBy _heldBy;
+ };
+
+ class BareMutex {
+ public:
+ explicit BareMutex(Synchronization*) {}
+ void lock() {
+ _mutex.lock();
+ }
+ void unlock() {
+ _mutex.unlock();
+ }
+
+ private:
+ Mutex _mutex;
+ };
+
+ using MyMutex = std::conditional_t<_instrumented, InstrumentedMutex, BareMutex>;
+
+ MyMutex _initMutex() {
+ return MyMutex{this};
+ }
+
+ std::shared_ptr<SyncDomain> _initRootDomain() {
+ if constexpr (_instrumented)
+ return std::make_shared<SyncDomain>();
+ else
+ return nullptr;
+ }
+
+public:
+ using Lock = stdx::unique_lock<MyMutex>;
+
+ explicit Synchronization(std::string name)
+ : Synchronization{std::move(name), _initRootDomain()} {}
+ Synchronization(std::string name, std::shared_ptr<SyncDomain> domain)
+ : _name{std::move(name)}, _domain{std::move(domain)} {}
+
+ Lock acquireLock() {
+ return Lock(_mutex);
+ }
+
+ void notifyOne() {
+ _cv.notify_one();
+ }
+
+ void notifyAll() {
+ _cv.notify_all();
+ }
+
+ template <typename P>
+ void wait(Lock& lk, const P& p) {
+ _cv.wait(lk, p);
+ }
+
+ template <typename Dur, typename P>
+ bool waitFor(Lock& lk, const Dur& dur, const P& p) {
+ return _cv.wait_for(lk, dur.toSystemDuration(), p);
+ }
+
+ const std::shared_ptr<SyncDomain>& domain() const {
+ return _domain;
+ }
+
+private:
+ std::string _name;
+ MyMutex _mutex{_initMutex()};
+ stdx::condition_variable_any _cv;
+ std::shared_ptr<SyncDomain> _domain;
+};
+
+} // namespace mongo::transport
+
+#undef MONGO_LOGV2_DEFAULT_COMPONENT
diff --git a/src/mongo/transport/service_executor_synchronous_thread.cpp b/src/mongo/transport/service_executor_synchronous_thread.cpp
new file mode 100644
index 00000000000..2d095abfd22
--- /dev/null
+++ b/src/mongo/transport/service_executor_synchronous_thread.cpp
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <fmt/format.h>
+
+#include "mongo/base/status.h"
+#include "mongo/logv2/log.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/functional.h"
+#include "mongo/util/thread_safety_context.h"
+
+#if !defined(_WIN32)
+#include <sys/resource.h>
+#include <unistd.h>
+#endif
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kExecutor
+
+namespace mongo::transport {
+
+namespace {
+
+using namespace fmt::literals;
+
+MONGO_FAIL_POINT_DEFINE(serviceExecutorSynchronousThreadFailToSpawn);
+
+#if !defined(_WIN32)
+
+template <typename T>
+auto roundToNext(T n, T mod) {
+ return (n + mod - 1) / mod * mod;
+}
+
+size_t getStackSizeResourceLimit() {
+ struct rlimit rlim;
+ if (getrlimit(RLIMIT_STACK, &rlim)) {
+ auto ec = lastPosixError();
+ iasserted(ErrorCodes::InternalError, "getrlimit:{}"_format(errorMessage(ec)));
+ }
+ return rlim.rlim_cur;
+}
+
+size_t getSystemPageSize() {
+ auto r = sysconf(_SC_PAGE_SIZE);
+ if (r == -1) {
+ auto ec = lastPosixError();
+ iasserted(ErrorCodes::InternalError, "sysconf:{}"_format(errorMessage(ec)));
+ }
+ return r;
+}
+
+void configureStackSize(pthread_attr_t* attrs) {
+ static constexpr size_t kSuggested = 1 << 20; // 1MiB
+ auto rlim = getStackSizeResourceLimit();
+ iassert(ErrorCodes::InternalError,
+ "Small stack size resource limit. rlim={}, suggested={}"_format(rlim, kSuggested),
+ rlim >= kSuggested);
+ size_t sz = kSuggested;
+
+#if __SANITIZE_ADDRESS__ || __has_feature(address_sanitizer)
+ // If we are using address sanitizer, we set the stack at
+ // ~75% (rounded up to a multiple of the page size) of our
+ // usual desired. Since ASAN is known to use stack more
+ // aggressively and should positively detect stack overflow,
+ // this gives us increased confidence during testing that we
+ // aren't flirting with our real 1MB limit for any tested
+ // workloads. Note: This calculation only works on POSIX
+ // platforms. If we ever decide to use the MSVC
+ // implementation of ASAN, we will need to revisit it.
+ sz = roundToNext(sz * 3 / 4, getSystemPageSize());
+#endif
+
+ if (int failed = pthread_attr_setstacksize(attrs, sz)) {
+ auto ec = posixError(failed);
+ iasserted(ErrorCodes::InternalError,
+ "pthread_attr_setstacksize: {}"_format(errorMessage(ec)));
+ }
+}
+
+#endif // !_WIN32
+
+} // namespace
+
+Status launchServiceWorkerThread(unique_function<void()> task) try {
+ if (serviceExecutorSynchronousThreadFailToSpawn.shouldFail())
+ iasserted(7015118, "Injected spawn failure");
+#if defined(_WIN32)
+ stdx::thread([task = std::move(task)]() mutable { task(); }).detach();
+#else
+ pthread_attr_t attrs;
+ pthread_attr_init(&attrs);
+ ScopeGuard attrsGuard([&attrs] { pthread_attr_destroy(&attrs); });
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ try {
+ configureStackSize(&attrs);
+ } catch (const DBException& ex) {
+ LOGV2_WARNING(5017170, "Failed to configure stack size. Using default", "error"_attr = ex);
+ }
+
+ // Wrap the user-specified `task` so it runs with an installed `sigaltstack`.
+ task = [sigAltStackController = std::make_shared<stdx::support::SigAltStackController>(),
+ f = std::move(task)]() mutable {
+ auto sigAltStackGuard = sigAltStackController->makeInstallGuard();
+ f();
+ };
+
+ struct ThreadData {
+ ThreadData(unique_function<void()> f) : f{std::move(f)} {}
+ unique_function<void()> f;
+ };
+ auto td = std::make_unique<ThreadData>(std::move(task));
+ auto threadBody = +[](void* arg) -> void* {
+ std::unique_ptr<ThreadData>(static_cast<ThreadData*>(arg))->f();
+ return nullptr;
+ };
+
+ pthread_t thread;
+ ThreadSafetyContext::getThreadSafetyContext()->onThreadCreate();
+ if (int failed = pthread_create(&thread, &attrs, threadBody, td.get()); failed > 0) {
+ LOGV2_ERROR_OPTIONS(4850900,
+ {logv2::UserAssertAfterLog()},
+ "pthread_create failed",
+ "error"_attr = errorMessage(posixError(failed)));
+ } else if (failed < 0) {
+ auto ec = lastPosixError();
+ LOGV2_ERROR_OPTIONS(4850901,
+ {logv2::UserAssertAfterLog()},
+ "pthread_create failed with a negative return code",
+ "code"_attr = failed,
+ "errno"_attr = ec.value(),
+ "error"_attr = errorMessage(ec));
+ }
+ td.release();
+#endif
+
+ return Status::OK();
+} catch (const std::exception& e) {
+ LOGV2_ERROR(22948, "Thread creation failed", "error"_attr = e.what());
+ return {ErrorCodes::InternalError,
+ "Failed to create service entry worker thread: {}"_format(e.what())};
+}
+
+} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_utils.h b/src/mongo/transport/service_executor_synchronous_thread.h
index 3874a6ce653..2a5666e5276 100644
--- a/src/mongo/transport/service_executor_utils.h
+++ b/src/mongo/transport/service_executor_synchronous_thread.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,14 +27,16 @@
* it in the license file.
*/
-#pragma once
-
-#include "mongo/transport/service_executor.h"
-#include "mongo/transport/session.h"
+#include "mongo/base/status.h"
#include "mongo/util/functional.h"
namespace mongo::transport {
+/**
+ * The ServiceExecutorSynchronous worker threads are specially made, and (except on Windows)
+ * different from the `stdx::thread` used elsewhere in the system.
+ * They're POSIX threads, they're detached, and they have a custom stack size.
+ */
Status launchServiceWorkerThread(unique_function<void()> task);
} // namespace mongo::transport
diff --git a/src/mongo/transport/service_executor_test.cpp b/src/mongo/transport/service_executor_test.cpp
index 7bf605cc28a..70605762783 100644
--- a/src/mongo/transport/service_executor_test.cpp
+++ b/src/mongo/transport/service_executor_test.cpp
@@ -50,10 +50,12 @@
#include "mongo/unittest/matcher.h"
#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/future.h"
#include "mongo/util/scopeguard.h"
+#include "mongo/util/synchronized_value.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
@@ -63,10 +65,125 @@ namespace {
namespace m = unittest::match;
+using unittest::stringify::stringifyForAssert;
+
constexpr auto kWorkerThreadRunTime = Milliseconds{1000};
// Run time + generous scheduling time slice
constexpr auto kShutdownTime = Milliseconds{kWorkerThreadRunTime.count() + 50};
+template <typename M>
+class AtomicWordLoadIs : public m::Matcher {
+public:
+ explicit AtomicWordLoadIs(M m) : _m{std::move(m)} {}
+
+ std::string describe() const {
+ return "AtomicWordLoadIs({})"_format(_m.describe());
+ }
+
+ template <typename T>
+ m::MatchResult match(const T& x) const {
+ auto r = x.load();
+ if (auto mr = _m.match(r); !mr)
+ return {false,
+ "{} failed {}{}"_format(stringifyForAssert(r), _m.describe(), mr.message())};
+ return {};
+ }
+
+private:
+ M _m;
+};
+
+/** Matches a ServiceExecutor or the BSONObj it produces with its `appendStats`. */
+template <typename ThreadMatch, typename ClientMatch>
+class ExecStatsIs : public m::Matcher {
+public:
+ ExecStatsIs(std::string execStatsLabel, ThreadMatch tm, ClientMatch cm)
+ : _execStatsLabel{std::move(execStatsLabel)}, _tm{std::move(tm)}, _cm{std::move(cm)} {}
+
+ std::string describe() const {
+ return "ExecStatsIs({},{})"_format(_tm.describe(), _cm.describe());
+ }
+
+ m::MatchResult match(const BSONObj& x) const {
+ unittest::stringify::Joiner joiner;
+ bool ok = true;
+ auto obj = x[_execStatsLabel].Obj();
+
+ auto tIn = obj["threadsRunning"].Int();
+ if (auto tmr = _tm.match(tIn); !tmr) {
+ joiner("threadsRunning={} failed {}{}"_format(
+ stringifyForAssert(tIn), _tm.describe(), tmr.message()));
+ ok = false;
+ }
+
+ auto cIn = obj["clientsInTotal"].Int();
+ if (auto cmr = _cm.match(cIn); !cmr) {
+ joiner("clientsInTotal={} failed {}{}"_format(
+ stringifyForAssert(cIn), _cm.describe(), cmr.message()));
+ ok = false;
+ }
+ return {ok, std::string{joiner}};
+ }
+
+ m::MatchResult match(const ServiceExecutor& exec) const {
+ BSONObjBuilder bob;
+ exec.appendStats(&bob);
+ BSONObj obj = bob.done();
+ if (auto mr = match(obj); !mr)
+ return {false, "obj={}, message={}"_format(obj.toString(), mr.message())};
+ return {};
+ }
+
+private:
+ std::string _execStatsLabel;
+ ThreadMatch _tm;
+ ClientMatch _cm;
+};
+
+/**
+ * Match is re-evaluated repeatedly with an exponential backoff, up to some
+ * limit, at which time this enclosing matcher fails.
+ */
+template <typename M>
+class SoonMatches : public m::Matcher {
+public:
+ explicit SoonMatches(M&& m, int retries = 16) : _m{std::forward<M>(m)}, _retries{retries} {}
+
+ std::string describe() const {
+ return "SoonMatches({},{})"_format(_m.describe(), _retries);
+ }
+
+ template <typename X>
+ m::MatchResult match(const X& x) const {
+ // Fibonacci generator for slow integral exponential backoff.
+ auto fib = [seq = std::array<int64_t, 2>{0, 1}]() mutable {
+ auto r = seq[0];
+ seq[0] = seq[1];
+ seq[1] = r + seq[0];
+ return r;
+ };
+ m::MatchResult mr;
+ for (int retries = _retries; retries--;) {
+ if (mr = _m.match(x); mr)
+ return mr;
+ Milliseconds backoff{fib()};
+ LOGV2_DEBUG(1715120,
+ 1,
+ "Retry",
+ "matcher"_attr = describe(),
+ "retries"_attr = retries,
+ "backoff"_attr = backoff,
+ "message"_attr = mr.message());
+ sleepFor(backoff);
+ }
+ return {false, "No result matched after {} tries: {}"_format(_retries, mr.message())};
+ }
+
+private:
+ M _m;
+ int _retries;
+};
+
class JoinThread : public stdx::thread {
public:
using stdx::thread::thread;
@@ -142,30 +259,252 @@ private:
asio::io_context _ioContext;
};
-class ServiceExecutorSynchronousTest : public unittest::Test {
+/**
+ * ServiceExecutorSynchronous and ServiceExecutorReserved are closely related.
+ * This is a common basis for the fixtures that test them.
+ */
+template <typename Derived>
+class ServiceExecutorSynchronousTestBase : public unittest::Test {
public:
- void setUp() override {
- setGlobalServiceContext(ServiceContext::make());
+ auto execStatsElementMatcher(int threads, int clients) {
+ return ExecStatsIs(getStatsLabel(), m::Eq(threads), m::Eq(clients));
+ }
+
+ void testCreateDestroy() {
+ makeExecutor();
+ }
+
+ void testStartStop() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ }
+
+ void testMakeTaskRunnerFailsBeforeStartup() {
+ auto executor = makeExecutor();
+ ASSERT_THROWS(executor.makeTaskRunner(), DBException);
}
- void tearDown() override {
- setGlobalServiceContext(nullptr);
+ void testMakeTaskRunner() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ executor.makeTaskRunner();
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ }
+
+ void testMakeTaskRunnerMultiple() {
+ auto reserved = getReserved();
+ auto executor = makeExecutor();
+#define LOCAL_CHECK_STATS(threads, clients) \
+ ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(threads, clients)))
+ ASSERT_OK(executor.start());
+ LOCAL_CHECK_STATS(reserved, 0);
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> runners;
+ // Add a few more beyond the reserve.
+ for (size_t i = 0; i < reserved + 3; ++i) {
+ runners.push_back(executor.makeTaskRunner());
+ LOCAL_CHECK_STATS(runners.size() + reserved, runners.size()) << ", i:" << i;
+ }
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ LOCAL_CHECK_STATS(0, 0);
+#undef LOCAL_CHECK_STATS
+ }
+
+ void testBasicTaskRuns() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ PromiseAndFuture<void> pf;
+ auto runner = executor.makeTaskRunner();
+ runner->schedule([&](Status st) { pf.promise.setFrom(st); });
+ ASSERT_DOES_NOT_THROW(pf.future.get());
+ ASSERT_OK(executor.shutdown(kShutdownTime));
+ }
+
+ void testShutdownTimeout() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ auto runner = executor.makeTaskRunner();
+ PromiseAndFuture<void> taskStarted;
+ runner->schedule([&](Status st) {
+ taskStarted.promise.setFrom(st);
+ sleepFor(Milliseconds{2000});
+ });
+ taskStarted.future.get();
+ ASSERT_THAT(executor.shutdown(Milliseconds{1000}),
+ m::StatusIs(m::Eq(ErrorCodes::ExceededTimeLimit), m::Any()));
+ }
+
+ // Should tolerate the failure to spawn all these reserved threads.
+ void testManyLeases() {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ for (size_t i = 0; i < 10; ++i) {
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ for (size_t j = 0; j < 20; ++j)
+ leases.push_back(executor.makeTaskRunner());
+ }
+ }
+
+ decltype(auto) makeExecutor() {
+ return _d().makeExecutor();
+ }
+
+ virtual std::string getStatsLabel() const = 0;
+
+ virtual size_t getReserved() const = 0;
+
+ std::unique_ptr<FailPointEnableBlock> makeFailSpawnBlock() {
+ return std::make_unique<FailPointEnableBlock>(
+ "serviceExecutorSynchronousThreadFailToSpawn");
+ }
+
+private:
+ decltype(auto) _d() const {
+ return static_cast<const Derived&>(*this);
+ }
+ decltype(auto) _d() {
+ return static_cast<Derived&>(*this);
+ }
+};
+
+class ServiceExecutorSynchronousTest
+ : public ServiceExecutorSynchronousTestBase<ServiceExecutorSynchronousTest> {
+public:
+ ServiceExecutorSynchronous makeExecutor() const {
+ return {};
+ }
+ std::string getStatsLabel() const override {
+ return "passthrough";
+ }
+ size_t getReserved() const override {
+ return 0;
}
};
-TEST_F(ServiceExecutorSynchronousTest, BasicTaskRuns) {
- ServiceExecutorSynchronous executor(getGlobalServiceContext());
+class ServiceExecutorReservedTest
+ : public ServiceExecutorSynchronousTestBase<ServiceExecutorReservedTest> {
+public:
+ ServiceExecutorReserved makeExecutor() const {
+ return {"testReserved", reserved, maxIdleThreads};
+ }
+ std::string getStatsLabel() const override {
+ return "reserved";
+ }
+ size_t getReserved() const override {
+ return reserved;
+ }
+
+protected:
+ size_t reserved = 5;
+ size_t maxIdleThreads = 0;
+};
+
+#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, case) \
+ TEST_F(fixture, case) { \
+ test##case (); \
+ }
+
+/**
+ * Expand this macro to instantiate the test cases for each of the corresponding
+ * member functions of the fixture base class. These are tests that
+ * ServiceExecutorSynchronous and ServiceExecutorReserved should pass.
+ */
+#define SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(fixture) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, CreateDestroy) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, StartStop) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, BasicTaskRuns) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerFailsBeforeStartup) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunner) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, MakeTaskRunnerMultiple) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ShutdownTimeout) \
+ SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASE(fixture, ManyLeases) \
+ /**/
+
+SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorSynchronousTest)
+SERVICE_EXECUTOR_SYNCHRONOUS_COMMON_TEST_CASES(ServiceExecutorReservedTest)
+
+#define SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(exec, threads, clients) \
+ ASSERT_THAT(exec, SoonMatches(execStatsElementMatcher(threads, clients)))
+
+// Create leases until leases exceed the reserve count of threads, and check
+// that the executor keeps its thread count above the number of leases by a
+// margin of `reserved` as it goes.
+TEST_F(ServiceExecutorReservedTest, CreateLeaseBeyondReserve) {
+#define LOCAL_CHECK_STATS(threads, clients) \
+ SERVICE_EXECUTOR_RESERVED_TEST_CHECK_EXEC_STATS(executor, threads, clients)
+ reserved = 5;
+ auto executor = makeExecutor();
ASSERT_OK(executor.start());
- PromiseAndFuture<void> pf;
- auto runner = executor.makeTaskRunner();
- runner->schedule([&](Status st) { pf.promise.setFrom(st); });
- ASSERT_DOES_NOT_THROW(pf.future.get());
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ while (leases.size() < reserved + 1) {
+ leases.push_back(executor.makeTaskRunner());
+ LOCAL_CHECK_STATS(leases.size() + reserved, leases.size());
+ }
+ while (!leases.empty()) {
+ leases.pop_back();
+ LOCAL_CHECK_STATS(leases.size() + reserved, leases.size());
+ }
ASSERT_OK(executor.shutdown(kShutdownTime));
+ LOCAL_CHECK_STATS(0, 0);
+#undef LOCAL_CHECK_STATS
+}
+
+TEST_F(ServiceExecutorReservedTest, ImmediateThrowFromNoReserveSpawnFailure) {
+ reserved = 0;
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ auto failSpawns = makeFailSpawnBlock();
+ ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, 0)));
+ ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>);
+ failSpawns = {};
+ ASSERT_DOES_NOT_THROW(executor.makeTaskRunner());
}
-TEST_F(ServiceExecutorSynchronousTest, MakeTaskRunnerFailsBeforeStartup) {
- ServiceExecutorSynchronous executor{getGlobalServiceContext()};
- ASSERT_THROWS(executor.makeTaskRunner(), DBException);
+// The basic point of the "reserved" ServiceExecutor is to allow new connections
+// during time periods in which spawns are failing. Verify this fundamental
+// requirement of the reserved ServiceExecutor.
+TEST_F(ServiceExecutorReservedTest, ReserveMitigatesSpawnFailures) {
+ reserved = 5;
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0));
+ auto failSpawns = makeFailSpawnBlock();
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ while (leases.size() < reserved)
+ leases.push_back(executor.makeTaskRunner());
+ // One worker is in the starting state while it unsuccesfully attempts to spawn.
+ // After the failure, we expect it to be removed from the starting bucket.
+ ASSERT_THAT(executor, SoonMatches(execStatsElementMatcher(reserved, leases.size())))
+ << "Should be sufficient reserve threads for demand during setup";
+ ASSERT_THROWS(executor.makeTaskRunner(), ExceptionFor<ErrorCodes::InternalError>)
+ << "Should throw when out of reserve threads";
+ failSpawns = {};
+ ASSERT_DOES_NOT_THROW(executor.makeTaskRunner());
+}
+
+// Check that workers are kept alive after their leases expire according to maxIdleThreads.
+TEST_F(ServiceExecutorReservedTest, MaxIdleThreads) {
+ for (reserved = 0; reserved != 5; ++reserved) {
+ for (maxIdleThreads = 0; maxIdleThreads != 5; ++maxIdleThreads) {
+ for (size_t leaseCount = 0; leaseCount != reserved + maxIdleThreads; ++leaseCount) {
+ auto executor = makeExecutor();
+ ASSERT_OK(executor.start());
+ ASSERT_THAT(executor, execStatsElementMatcher(reserved, 0));
+
+ std::vector<std::unique_ptr<ServiceExecutor::Executor>> leases;
+ while (leases.size() < leaseCount)
+ leases.push_back(executor.makeTaskRunner());
+ leases.clear();
+
+ ASSERT_THAT(executor,
+ SoonMatches(execStatsElementMatcher(
+ reserved + std::min(maxIdleThreads, leaseCount), 0)))
+ << ", reserved=" << reserved //
+ << ", maxIdleThreads=" << maxIdleThreads //
+ << ", leaseCount=" << leaseCount;
+ }
+ }
+ }
}
class ServiceExecutorFixedTest : public unittest::Test {
diff --git a/src/mongo/transport/service_executor_utils.cpp b/src/mongo/transport/service_executor_utils.cpp
deleted file mode 100644
index 317e4261598..00000000000
--- a/src/mongo/transport/service_executor_utils.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/transport/service_executor_utils.h"
-
-#include <fmt/format.h>
-#include <functional>
-#include <memory>
-
-#include "mongo/logv2/log.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_executor.h"
-#include "mongo/util/assert_util.h"
-#include "mongo/util/debug_util.h"
-#include "mongo/util/scopeguard.h"
-#include "mongo/util/thread_safety_context.h"
-
-#if !defined(_WIN32)
-#include <sys/resource.h>
-#include <unistd.h>
-#endif
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
-
-
-#if !defined(__has_feature)
-#define __has_feature(x) 0
-#endif
-
-namespace mongo::transport {
-
-namespace {
-using namespace fmt::literals;
-
-void* runFunc(void* ctx) {
- auto taskPtr =
- std::unique_ptr<unique_function<void()>>(static_cast<unique_function<void()>*>(ctx));
- (*taskPtr)();
-
- return nullptr;
-}
-} // namespace
-
-Status launchServiceWorkerThread(unique_function<void()> task) {
-
- try {
-#if defined(_WIN32)
- stdx::thread([task = std::move(task)]() mutable { task(); }).detach();
-#else
- pthread_attr_t attrs;
- pthread_attr_init(&attrs);
- ScopeGuard attrsGuard([&attrs] { pthread_attr_destroy(&attrs); });
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
-
- static const rlim_t kStackSize =
- 1024 * 1024; // if we change this we need to update the warning
-
- struct rlimit limits;
- invariant(getrlimit(RLIMIT_STACK, &limits) == 0);
- if (limits.rlim_cur >= kStackSize) {
-
- size_t stackSizeToSet = kStackSize;
-
-#if !defined(_WIN32) && (__SANITIZE_ADDRESS__ || __has_feature(address_sanitizer))
- // If we are using address sanitizer, we set the stack at
- // ~75% (rounded up to a multiple of the page size) of our
- // usual desired. Since ASAN is known to use stack more
- // aggressively and should positively detect stack overflow,
- // this gives us increased confidence during testing that we
- // aren't flirting with our real 1MB limit for any tested
- // workloads. Note: This calculation only works on POSIX
- // platforms. If we ever decide to use the MSVC
- // implementation of ASAN, we will need to revisit it.
- long page_size = sysconf(_SC_PAGE_SIZE);
- stackSizeToSet =
- ((((stackSizeToSet * 3) >> 2) + page_size - 1) / page_size) * page_size;
-#endif
- int failed = pthread_attr_setstacksize(&attrs, stackSizeToSet);
- if (failed) {
- LOGV2_WARNING(22949,
- "pthread_attr_setstacksize failed: {error}",
- "pthread_attr_setstacksize failed",
- "error"_attr = errorMessage(posixError(failed)));
- }
- } else {
- LOGV2_WARNING(22950,
- "Stack size set to {stackSizeKiB}KiB. We suggest 1024KiB",
- "Stack size not set to suggested 1024KiB",
- "stackSizeKiB"_attr = (limits.rlim_cur / 1024));
- }
-
- // Wrap the user-specified `task` so it runs with an installed `sigaltstack`.
- task = [sigAltStackController = std::make_shared<stdx::support::SigAltStackController>(),
- f = std::move(task)]() mutable {
- auto sigAltStackGuard = sigAltStackController->makeInstallGuard();
- f();
- };
-
- pthread_t thread;
- auto ctx = std::make_unique<unique_function<void()>>(std::move(task));
- ThreadSafetyContext::getThreadSafetyContext()->onThreadCreate();
-
- int failed = pthread_create(&thread, &attrs, runFunc, ctx.get());
- if (failed > 0) {
- LOGV2_ERROR_OPTIONS(4850900,
- {logv2::UserAssertAfterLog()},
- "pthread_create failed: error: {error}",
- "pthread_create failed",
- "error"_attr = errorMessage(posixError(failed)));
- } else if (failed < 0) {
- auto ec = lastPosixError();
- LOGV2_ERROR_OPTIONS(4850901,
- {logv2::UserAssertAfterLog()},
- "pthread_create failed with a negative return code: {code}, errno: "
- "{errno}, error: {error}",
- "pthread_create failed with a negative return code",
- "code"_attr = failed,
- "errno"_attr = ec.value(),
- "error"_attr = errorMessage(ec));
- }
-
- ctx.release();
-#endif
-
- } catch (const std::exception& e) {
- LOGV2_ERROR(22948, "Thread creation failed", "error"_attr = e.what());
- return {ErrorCodes::InternalError,
- format(FMT_STRING("Failed to create service entry worker thread: {}"), e.what())};
- }
-
- return Status::OK();
-}
-
-} // namespace mongo::transport
diff --git a/src/mongo/transport/session_workflow.cpp b/src/mongo/transport/session_workflow.cpp
index 080d1565912..32b49603221 100644
--- a/src/mongo/transport/session_workflow.cpp
+++ b/src/mongo/transport/session_workflow.cpp
@@ -416,7 +416,7 @@ public:
return ServiceExecutorContext::get(client())->getServiceExecutor();
}
- std::shared_ptr<ServiceExecutor::TaskRunner> taskRunner() {
+ std::shared_ptr<ServiceExecutor::Executor> taskRunner() {
auto exec = executor();
// Allows switching the executor between iterations of the workflow.
if (MONGO_unlikely(!_taskRunner.source || _taskRunner.source != exec))
@@ -436,10 +436,22 @@ private:
class WorkItem;
struct RunnerAndSource {
- std::shared_ptr<ServiceExecutor::TaskRunner> runner;
+ std::shared_ptr<ServiceExecutor::Executor> runner;
ServiceExecutor* source = nullptr;
};
+ /**
+ * Notify the task runner that this would be a good time to yield. It might
+ * not actually yield, depending on implementation and on overall system
+ * state.
+ *
+ * Yielding at certain points in a command's processing pipeline has been
+ * considered to be beneficial to performance.
+ */
+ void _yieldPointReached() {
+ taskRunner()->yieldPointReached();
+ }
+
/** Alias: refers to this Impl, but holds a ref to the enclosing workflow. */
std::shared_ptr<Impl> shared_from_this() {
return {_workflow->shared_from_this(), this};
@@ -686,6 +698,7 @@ void SessionWorkflow::Impl::scheduleNewLoop(Status status) try {
// If we're in exhaust, we're not expecting more data.
taskRunner()->schedule(std::move(cb));
} else {
+ _yieldPointReached();
taskRunner()->runOnDataAvailable(session(), std::move(cb));
}
} catch (const DBException& ex) {
@@ -723,13 +736,7 @@ void SessionWorkflow::Impl::startNewLoop(const Status& executorStatus) {
if (_work->hasOut()) {
sendMessage();
_metrics.sent(*session());
-
- // Performance testing showed a significant benefit from yielding here.
- // TODO SERVER-57531: Once we enable the use of a fixed-size thread pool
- // for handling client connection handshaking, we should only yield here if
- // we're on a dedicated thread.
- executor()->yieldIfAppropriate();
-
+ _yieldPointReached();
_metrics.yielded();
}
})
diff --git a/src/mongo/transport/session_workflow_test.cpp b/src/mongo/transport/session_workflow_test.cpp
index df264e372a2..b98fa310b04 100644
--- a/src/mongo/transport/session_workflow_test.cpp
+++ b/src/mongo/transport/session_workflow_test.cpp
@@ -52,7 +52,6 @@
#include "mongo/transport/service_entry_point.h"
#include "mongo/transport/service_entry_point_impl.h"
#include "mongo/transport/service_executor.h"
-#include "mongo/transport/service_executor_utils.h"
#include "mongo/transport/session_workflow.h"
#include "mongo/transport/session_workflow_test_util.h"
#include "mongo/transport/transport_layer_mock.h"