summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/commands/server_status_servers.cpp4
-rw-r--r--src/mongo/db/mongod_main.cpp29
-rw-r--r--src/mongo/db/service_context.cpp8
-rw-r--r--src/mongo/db/service_context.h13
-rw-r--r--src/mongo/s/mongos_main.cpp26
-rw-r--r--src/mongo/tools/bridge.cpp15
-rw-r--r--src/mongo/transport/SConscript3
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp94
-rw-r--r--src/mongo/transport/service_entry_point_impl.h4
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp16
-rw-r--r--src/mongo/transport/service_executor_fixed.h3
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp25
-rw-r--r--src/mongo/transport/service_executor_reserved.h3
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp15
-rw-r--r--src/mongo/transport/service_executor_synchronous.h3
-rw-r--r--src/mongo/transport/service_state_machine.cpp5
-rw-r--r--src/mongo/transport/service_state_machine.h2
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp2
18 files changed, 154 insertions, 116 deletions
diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp
index 85a57202370..cc39b6da7d8 100644
--- a/src/mongo/db/commands/server_status_servers.cpp
+++ b/src/mongo/db/commands/server_status_servers.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/commands/server_status.h"
#include "mongo/transport/message_compressor_registry.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/util/net/hostname_canonicalization.h"
#include "mongo/util/net/socket_utils.h"
#include "mongo/util/net/ssl_manager.h"
@@ -77,12 +78,13 @@ public:
return true;
}
+ // TODO: need to track connections in server stats (see SERVER-49073)
BSONObj generateSection(OperationContext* opCtx,
const BSONElement& configElement) const override {
BSONObjBuilder b;
networkCounter.append(b);
appendMessageCompressionStats(&b);
- auto executor = opCtx->getServiceContext()->getServiceExecutor();
+ auto executor = transport::ServiceExecutorSynchronous::get(opCtx->getServiceContext());
if (executor) {
BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats"));
executor->appendStats(&section);
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 05c9ed67511..b355becdf2a 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -736,16 +736,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
// operation context anymore
startupOpCtx.reset();
- auto start = serviceContext->getServiceExecutor()->start();
- if (!start.isOK()) {
- LOGV2_ERROR(20570,
- "Error starting service executor: {error}",
- "Error starting service executor",
- "error"_attr = start);
- return EXIT_NET_ERROR;
- }
-
- start = serviceContext->getServiceEntryPoint()->start();
+ auto start = serviceContext->getServiceEntryPoint()->start();
if (!start.isOK()) {
LOGV2_ERROR(20571,
"Error starting service entry point: {error}",
@@ -1295,11 +1286,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
CatalogCacheLoader::get(serviceContext).shutDown();
}
-#if __has_feature(address_sanitizer) || __has_feature(thread_sanitizer)
- // When running under address sanitizer, we get false positive leaks due to disorder around
- // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
- // harder to dry up the server from active connections before going on to really shut down.
-
// Shutdown the Service Entry Point and its sessions and give it a grace period to complete.
if (auto sep = serviceContext->getServiceEntryPoint()) {
LOGV2_OPTIONS(4784923, {LogComponent::kCommand}, "Shutting down the ServiceEntryPoint");
@@ -1310,19 +1296,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
}
}
- // Shutdown and wait for the service executor to exit
- if (auto svcExec = serviceContext->getServiceExecutor()) {
- LOGV2_OPTIONS(4784924, {LogComponent::kExecutor}, "Shutting down the service executor");
- Status status = svcExec->shutdown(Seconds(10));
- if (!status.isOK()) {
- LOGV2_OPTIONS(20564,
- {LogComponent::kNetwork},
- "Service executor did not shutdown within the time limit",
- "error"_attr = status);
- }
- }
-#endif
-
LOGV2(4784925, "Shutting down free monitoring");
stopFreeMonitoring();
diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp
index c8a6a6f60bd..8796d7fdae6 100644
--- a/src/mongo/db/service_context.cpp
+++ b/src/mongo/db/service_context.cpp
@@ -193,10 +193,6 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const {
return _serviceEntryPoint.get();
}
-transport::ServiceExecutor* ServiceContext::getServiceExecutor() const {
- return _serviceExecutor.get();
-}
-
void ServiceContext::setStorageEngine(std::unique_ptr<StorageEngine> engine) {
invariant(engine);
invariant(!_storageEngine);
@@ -227,10 +223,6 @@ void ServiceContext::setTransportLayer(std::unique_ptr<transport::TransportLayer
_transportLayer = std::move(tl);
}
-void ServiceContext::setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec) {
- _serviceExecutor = std::move(exec);
-}
-
void ServiceContext::ClientDeleter::operator()(Client* client) const {
ServiceContext* const service = client->getServiceContext();
{
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 4b4156a78bf..a3ee1c646b2 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -490,14 +490,6 @@ public:
ServiceEntryPoint* getServiceEntryPoint() const;
/**
- * Get the service executor for the service context.
- *
- * See ServiceStateMachine for how this is used. Some configurations may not have a service
- * executor registered and this will return a nullptr.
- */
- transport::ServiceExecutor* getServiceExecutor() const;
-
- /**
* Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been
* added/started.
*
@@ -582,11 +574,6 @@ public:
void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl);
/**
- * Binds the service executor to the service context
- */
- void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec);
-
- /**
* Creates a delayed execution baton with basic functionality
*/
BatonHandle makeBaton(OperationContext* opCtx) const;
diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp
index fdc7cbfa819..b2a7cb15b77 100644
--- a/src/mongo/s/mongos_main.cpp
+++ b/src/mongo/s/mongos_main.cpp
@@ -358,11 +358,6 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) {
CatalogCacheLoader::get(serviceContext).shutDown();
}
-#if __has_feature(address_sanitizer)
- // When running under address sanitizer, we get false positive leaks due to disorder around
- // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
- // harder to dry up the server from active connections before going on to really shut down.
-
// Shutdown the Service Entry Point and its sessions and give it a grace period to complete.
if (auto sep = serviceContext->getServiceEntryPoint()) {
if (!sep->shutdown(Seconds(10))) {
@@ -372,18 +367,6 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) {
}
}
- // Shutdown and wait for the service executor to exit
- if (auto svcExec = serviceContext->getServiceExecutor()) {
- Status status = svcExec->shutdown(Seconds(5));
- if (!status.isOK()) {
- LOGV2_OPTIONS(22845,
- {LogComponent::kNetwork},
- "Service executor did not shutdown within the time limit",
- "error"_attr = status);
- }
- }
-#endif
-
// Shutdown Full-Time Data Capture
stopMongoSFTDC();
}
@@ -798,15 +781,6 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
std::make_unique<SessionsCollectionSharded>(),
RouterSessionCatalog::reapSessionsOlderThan));
- status = serviceContext->getServiceExecutor()->start();
- if (!status.isOK()) {
- LOGV2_ERROR(22859,
- "Error starting service executor: {error}",
- "Error starting service executor",
- "error"_attr = redact(status));
- return EXIT_NET_ERROR;
- }
-
status = serviceContext->getServiceEntryPoint()->start();
if (!status.isOK()) {
LOGV2_ERROR(22860,
diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp
index 986114093ed..1c2f7d46bc9 100644
--- a/src/mongo/tools/bridge.cpp
+++ b/src/mongo/tools/bridge.cpp
@@ -485,10 +485,9 @@ int bridgeMain(int argc, char** argv) {
setGlobalServiceContext(ServiceContext::make());
auto serviceContext = getGlobalServiceContext();
serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointBridge>(serviceContext));
- serviceContext->setServiceExecutor(
- std::make_unique<transport::ServiceExecutorSynchronous>(serviceContext));
-
- fassert(50766, serviceContext->getServiceExecutor()->start());
+ if (auto status = serviceContext->getServiceEntryPoint()->start(); !status.isOK()) {
+ LOGV2(4907203, "Error starting service entry point", "error"_attr = status);
+ }
transport::TransportLayerASIO::Options opts;
opts.ipList.emplace_back("0.0.0.0");
@@ -497,13 +496,13 @@ int bridgeMain(int argc, char** argv) {
serviceContext->setTransportLayer(std::make_unique<mongo::transport::TransportLayerASIO>(
opts, serviceContext->getServiceEntryPoint()));
auto tl = serviceContext->getTransportLayer();
- if (!tl->setup().isOK()) {
- LOGV2(22922, "Error setting up transport layer");
+ if (auto status = tl->setup(); !status.isOK()) {
+ LOGV2(22922, "Error setting up transport layer", "error"_attr = status);
return EXIT_NET_ERROR;
}
- if (!tl->start().isOK()) {
- LOGV2(22923, "Error starting transport layer");
+ if (auto status = tl->start(); !status.isOK()) {
+ LOGV2(22923, "Error starting transport layer", "error"_attr = status);
return EXIT_NET_ERROR;
}
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 1783a1cd9b8..abd2e210518 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -180,7 +180,8 @@ tlEnv.CppUnitTest(
'transport_layer_asio_test.cpp',
'service_executor_test.cpp',
'max_conns_override_test.cpp',
- 'service_state_machine_test.cpp',
+ # TODO: service_state_machine test to be re-written in SERVER-50141.
+ # 'service_state_machine_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 83bbe73c43a..ee83e1e48a0 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -48,6 +48,10 @@
#include <sys/resource.h>
#endif
+#if !defined(__has_feature)
+#define __has_feature(x) 0
+#endif
+
namespace mongo {
bool shouldOverrideMaxConns(const transport::SessionHandle& session,
@@ -114,20 +118,29 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
}
_maxNumConnections = supportedMax;
-
- if (serverGlobalParams.reservedAdminThreads) {
- _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
- _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
- }
}
Status ServiceEntryPointImpl::start() {
- if (_adminInternalPool)
- return _adminInternalPool->start();
- else
- return Status::OK();
+ if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
+ !status.isOK()) {
+ return status;
+ }
+
+ if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
+ if (auto status = exec->start(); !status.isOK()) {
+ return status;
+ }
+ }
+
+ // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
+ // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
+ // return status;
+ // }
+
+ return Status::OK();
}
+// TODO: explicitly start on the fixed executor
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
const auto& remoteAddr = session->remoteAddr();
@@ -140,7 +153,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
const bool quiet = serverGlobalParams.quiet.load();
size_t connectionCount;
- auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
+ auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode();
auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
auto usingMaxConnOverride = false;
@@ -168,8 +181,9 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
"connectionCount"_attr = connectionCount);
}
return;
- } else if (usingMaxConnOverride && _adminInternalPool) {
- ssm->setServiceExecutor(_adminInternalPool.get());
+ } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx);
+ usingMaxConnOverride && exec) {
+ ssm->setServiceExecutor(exec);
}
if (!quiet) {
@@ -219,8 +233,14 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
}
bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
+#if __has_feature(address_sanitizer) || __has_feature(thread_sanitizer)
+ // When running under address sanitizer, we get false positive leaks due to disorder around
+ // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
+ // harder to dry up the server from active connections before going on to really shut down.
+
using logv2::LogComponent;
+ auto start = _svcCtx->getPreciseClockSource()->now();
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
// Request that all sessions end, while holding the _sesionsMutex, loop over all the current
@@ -257,7 +277,37 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
"shutdown: exhausted grace period active workers to drain; continuing with shutdown...",
"workers"_attr = numOpenSessions());
}
+
+ lk.unlock();
+
+ // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
+ // timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ // timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
+ // !status.isOK()) {
+ // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
+ // }
+
+ timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
+ if (auto status = exec->shutdown(timeout); !status.isOK()) {
+ LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
+ }
+ }
+
+ timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ if (auto status =
+ transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
+ !status.isOK()) {
+ LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
+ }
+
return result;
+#else
+ return true;
+#endif
}
void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
@@ -267,18 +317,18 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
bob->append("current", static_cast<int>(sessionCount));
bob->append("available", static_cast<int>(_maxNumConnections - sessionCount));
bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
- if (auto sc = getGlobalServiceContext()) {
- bob->append("active", static_cast<int>(sc->getActiveClientOperations()));
- bob->append("exhaustIsMaster",
- static_cast<int>(HelloMetrics::get(sc)->getNumExhaustIsMaster()));
- bob->append("exhaustHello", static_cast<int>(HelloMetrics::get(sc)->getNumExhaustHello()));
- bob->append("awaitingTopologyChanges",
- static_cast<int>(HelloMetrics::get(sc)->getNumAwaitingTopologyChanges()));
- }
- if (_adminInternalPool) {
+ invariant(_svcCtx);
+ bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations()));
+ bob->append("exhaustIsMaster",
+ static_cast<int>(HelloMetrics::get(_svcCtx)->getNumExhaustIsMaster()));
+ bob->append("exhaustHello", static_cast<int>(HelloMetrics::get(_svcCtx)->getNumExhaustHello()));
+ bob->append("awaitingTopologyChanges",
+ static_cast<int>(HelloMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges()));
+
+ if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) {
BSONObjBuilder section(bob->subobjStart("adminConnections"));
- _adminInternalPool->appendStats(&section);
+ adminExec->appendStats(&section);
}
}
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 38c1319657a..0528a19fbdb 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -36,7 +36,9 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/variant.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_reserved.h"
+#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/net/cidr.h"
@@ -90,8 +92,6 @@ private:
size_t _maxNumConnections{DEFAULT_MAX_CONN};
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};
-
- std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool;
};
/*
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index 2a1fc737cc1..b068e487126 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -37,6 +37,7 @@
#include "mongo/transport/session.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
+#include "mongo/util/thread_safety_context.h"
namespace mongo {
@@ -49,6 +50,15 @@ namespace {
constexpr auto kThreadsRunning = "threadsRunning"_sd;
constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "fixed"_sd;
+
+const auto getServiceExecutorFixed =
+ ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>();
+
+const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{
+ "ServiceExecutorFixed", [](ServiceContext* ctx) {
+ getServiceExecutorFixed(ctx) =
+ std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{});
+ }};
} // namespace
ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options)
@@ -86,6 +96,12 @@ Status ServiceExecutorFixed::start() {
return Status::OK();
}
+ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
+ auto& ref = getServiceExecutorFixed(ctx);
+ invariant(ref);
+ return ref.get();
+}
+
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
auto waitForShutdown = [&]() mutable -> Status {
stdx::unique_lock<Latch> lk(_mutex);
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index 82be057e1f5..6c540576a78 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -33,6 +33,7 @@
#include <memory>
#include "mongo/base/status.h"
+#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -55,6 +56,8 @@ public:
explicit ServiceExecutorFixed(ThreadPool::Options options);
virtual ~ServiceExecutorFixed();
+ static ServiceExecutorFixed* get(ServiceContext* ctx);
+
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index cae9653e60f..d81ad5be200 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -33,11 +33,13 @@
#include "mongo/transport/service_executor_reserved.h"
+#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_utils.h"
#include "mongo/util/processinfo.h"
+#include "mongo/util/thread_safety_context.h"
namespace mongo {
namespace transport {
@@ -48,6 +50,19 @@ constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "reserved"_sd;
constexpr auto kReadyThreads = "readyThreads"_sd;
constexpr auto kStartingThreads = "startingThreads"_sd;
+
+const auto getServiceExecutorReserved =
+ ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>();
+
+const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{
+ "ServiceExecutorReserved", [](ServiceContext* ctx) {
+ if (!serverGlobalParams.reservedAdminThreads) {
+ return;
+ }
+
+ getServiceExecutorReserved(ctx) = std::make_unique<transport::ServiceExecutorReserved>(
+ ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
+ }};
} // namespace
thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {};
@@ -147,6 +162,12 @@ Status ServiceExecutorReserved::_startWorker() {
});
}
+ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) {
+ auto& ref = getServiceExecutorReserved(ctx);
+
+ // The ServiceExecutorReserved could be absent, so nullptr is okay.
+ return ref.get();
+}
Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
LOGV2_DEBUG(22980, 3, "Shutting down reserved executor");
@@ -173,8 +194,8 @@ Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) {
if (!_localWorkQueue.empty()) {
// Execute task directly (recurse) if allowed by the caller as it produced better
// performance in testing. Try to limit the amount of recursion so we don't blow up the
- // stack, even though this shouldn't happen with this executor that uses blocking network
- // I/O.
+ // stack, even though this shouldn't happen with this executor that uses blocking
+ // network I/O.
if ((flags & ScheduleFlags::kMayRecurse) &&
(_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
++_localRecursionDepth;
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index e3acf6febd9..60de4bc2993 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -32,6 +32,7 @@
#include <deque>
#include "mongo/base/status.h"
+#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -54,6 +55,8 @@ class ServiceExecutorReserved final : public ServiceExecutor {
public:
explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
+ static ServiceExecutorReserved* get(ServiceContext* ctx);
+
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index d034e208954..c75bdbb0952 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -38,6 +38,7 @@
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_utils.h"
#include "mongo/util/processinfo.h"
+#include "mongo/util/thread_safety_context.h"
namespace mongo {
namespace transport {
@@ -45,6 +46,14 @@ namespace {
constexpr auto kThreadsRunning = "threadsRunning"_sd;
constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "passthrough"_sd;
+
+const auto getServiceExecutorSynchronous =
+ ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
+
+const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{
+ "ServiceExecutorSynchronous", [](ServiceContext* ctx) {
+ getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx);
+ }};
} // namespace
thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {};
@@ -78,6 +87,12 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
"passthrough executor couldn't shutdown all worker threads within time limit.");
}
+ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
+ auto& ref = getServiceExecutorSynchronous(ctx);
+ invariant(ref);
+ return ref.get();
+}
+
Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) {
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 940382b53e8..840dc702d4c 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -32,6 +32,7 @@
#include <deque>
#include "mongo/base/status.h"
+#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -49,6 +50,8 @@ class ServiceExecutorSynchronous final : public ServiceExecutor {
public:
explicit ServiceExecutorSynchronous(ServiceContext* ctx);
+ static ServiceExecutorSynchronous* get(ServiceContext* ctx);
+
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 373a362d0df..3bfd208d1d1 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -45,6 +45,7 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
@@ -300,11 +301,11 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
_sep{svcContext->getServiceEntryPoint()},
_transportMode(transportMode),
_serviceContext(svcContext),
- _serviceExecutor(_serviceContext->getServiceExecutor()),
_sessionHandle(session),
_threadName{str::stream() << "conn" << _session()->id()},
_dbClient{svcContext->makeClient(_threadName, std::move(session))},
- _dbClientPtr{_dbClient.get()} {}
+ _dbClientPtr{_dbClient.get()},
+ _serviceExecutor(transport::ServiceExecutorSynchronous::get(_serviceContext)) {}
const transport::SessionHandle& ServiceStateMachine::_session() const {
return _sessionHandle;
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 0572c4f6ac8..22e0b8aa752 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -230,12 +230,12 @@ private:
transport::Mode _transportMode;
ServiceContext* const _serviceContext;
- transport::ServiceExecutor* _serviceExecutor;
transport::SessionHandle _sessionHandle;
const std::string _threadName;
ServiceContext::UniqueClient _dbClient;
const Client* _dbClientPtr;
+ transport::ServiceExecutor* _serviceExecutor;
std::function<void()> _cleanupHook;
bool _inExhaust = false;
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index b291671eb9a..272a9ac391d 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -138,8 +138,6 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
transport::TransportLayerASIO::Options opts(config);
opts.transportMode = transport::Mode::kSynchronous;
- ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx));
-
std::vector<std::unique_ptr<TransportLayer>> retVector;
retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
return std::make_unique<TransportLayerManager>(std::move(retVector));