diff options
-rw-r--r-- | src/mongo/db/commands/server_status_servers.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/service_context.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/service_context.h | 13 | ||||
-rw-r--r-- | src/mongo/s/mongos_main.cpp | 26 | ||||
-rw-r--r-- | src/mongo/tools/bridge.cpp | 15 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 94 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.cpp | 25 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 15 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 5 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 2 |
18 files changed, 154 insertions, 116 deletions
diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp index 85a57202370..cc39b6da7d8 100644 --- a/src/mongo/db/commands/server_status_servers.cpp +++ b/src/mongo/db/commands/server_status_servers.cpp @@ -33,6 +33,7 @@ #include "mongo/db/commands/server_status.h" #include "mongo/transport/message_compressor_registry.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_synchronous.h" #include "mongo/util/net/hostname_canonicalization.h" #include "mongo/util/net/socket_utils.h" #include "mongo/util/net/ssl_manager.h" @@ -77,12 +78,13 @@ public: return true; } + // TODO: need to track connections in server stats (see SERVER-49073) BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override { BSONObjBuilder b; networkCounter.append(b); appendMessageCompressionStats(&b); - auto executor = opCtx->getServiceContext()->getServiceExecutor(); + auto executor = transport::ServiceExecutorSynchronous::get(opCtx->getServiceContext()); if (executor) { BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats")); executor->appendStats(§ion); diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 05c9ed67511..b355becdf2a 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -736,16 +736,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { // operation context anymore startupOpCtx.reset(); - auto start = serviceContext->getServiceExecutor()->start(); - if (!start.isOK()) { - LOGV2_ERROR(20570, - "Error starting service executor: {error}", - "Error starting service executor", - "error"_attr = start); - return EXIT_NET_ERROR; - } - - start = serviceContext->getServiceEntryPoint()->start(); + auto start = serviceContext->getServiceEntryPoint()->start(); if (!start.isOK()) { LOGV2_ERROR(20571, "Error starting service entry point: {error}", @@ -1295,11 +1286,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { CatalogCacheLoader::get(serviceContext).shutDown(); } -#if __has_feature(address_sanitizer) || __has_feature(thread_sanitizer) - // When running under address sanitizer, we get false positive leaks due to disorder around - // the lifecycle of a connection and request. When we are running under ASAN, we try a lot - // harder to dry up the server from active connections before going on to really shut down. - // Shutdown the Service Entry Point and its sessions and give it a grace period to complete. if (auto sep = serviceContext->getServiceEntryPoint()) { LOGV2_OPTIONS(4784923, {LogComponent::kCommand}, "Shutting down the ServiceEntryPoint"); @@ -1310,19 +1296,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { } } - // Shutdown and wait for the service executor to exit - if (auto svcExec = serviceContext->getServiceExecutor()) { - LOGV2_OPTIONS(4784924, {LogComponent::kExecutor}, "Shutting down the service executor"); - Status status = svcExec->shutdown(Seconds(10)); - if (!status.isOK()) { - LOGV2_OPTIONS(20564, - {LogComponent::kNetwork}, - "Service executor did not shutdown within the time limit", - "error"_attr = status); - } - } -#endif - LOGV2(4784925, "Shutting down free monitoring"); stopFreeMonitoring(); diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index c8a6a6f60bd..8796d7fdae6 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -193,10 +193,6 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const { return _serviceEntryPoint.get(); } -transport::ServiceExecutor* ServiceContext::getServiceExecutor() const { - return _serviceExecutor.get(); -} - void ServiceContext::setStorageEngine(std::unique_ptr<StorageEngine> engine) { invariant(engine); invariant(!_storageEngine); @@ -227,10 +223,6 @@ void ServiceContext::setTransportLayer(std::unique_ptr<transport::TransportLayer _transportLayer = std::move(tl); } -void ServiceContext::setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec) { - _serviceExecutor = std::move(exec); -} - void ServiceContext::ClientDeleter::operator()(Client* client) const { ServiceContext* const service = client->getServiceContext(); { diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 4b4156a78bf..a3ee1c646b2 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -490,14 +490,6 @@ public: ServiceEntryPoint* getServiceEntryPoint() const; /** - * Get the service executor for the service context. - * - * See ServiceStateMachine for how this is used. Some configurations may not have a service - * executor registered and this will return a nullptr. - */ - transport::ServiceExecutor* getServiceExecutor() const; - - /** * Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been * added/started. * @@ -582,11 +574,6 @@ public: void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl); /** - * Binds the service executor to the service context - */ - void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec); - - /** * Creates a delayed execution baton with basic functionality */ BatonHandle makeBaton(OperationContext* opCtx) const; diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp index fdc7cbfa819..b2a7cb15b77 100644 --- a/src/mongo/s/mongos_main.cpp +++ b/src/mongo/s/mongos_main.cpp @@ -358,11 +358,6 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) { CatalogCacheLoader::get(serviceContext).shutDown(); } -#if __has_feature(address_sanitizer) - // When running under address sanitizer, we get false positive leaks due to disorder around - // the lifecycle of a connection and request. When we are running under ASAN, we try a lot - // harder to dry up the server from active connections before going on to really shut down. - // Shutdown the Service Entry Point and its sessions and give it a grace period to complete. if (auto sep = serviceContext->getServiceEntryPoint()) { if (!sep->shutdown(Seconds(10))) { @@ -372,18 +367,6 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) { } } - // Shutdown and wait for the service executor to exit - if (auto svcExec = serviceContext->getServiceExecutor()) { - Status status = svcExec->shutdown(Seconds(5)); - if (!status.isOK()) { - LOGV2_OPTIONS(22845, - {LogComponent::kNetwork}, - "Service executor did not shutdown within the time limit", - "error"_attr = status); - } - } -#endif - // Shutdown Full-Time Data Capture stopMongoSFTDC(); } @@ -798,15 +781,6 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { std::make_unique<SessionsCollectionSharded>(), RouterSessionCatalog::reapSessionsOlderThan)); - status = serviceContext->getServiceExecutor()->start(); - if (!status.isOK()) { - LOGV2_ERROR(22859, - "Error starting service executor: {error}", - "Error starting service executor", - "error"_attr = redact(status)); - return EXIT_NET_ERROR; - } - status = serviceContext->getServiceEntryPoint()->start(); if (!status.isOK()) { LOGV2_ERROR(22860, diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 986114093ed..1c2f7d46bc9 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -485,10 +485,9 @@ int bridgeMain(int argc, char** argv) { setGlobalServiceContext(ServiceContext::make()); auto serviceContext = getGlobalServiceContext(); serviceContext->setServiceEntryPoint(std::make_unique<ServiceEntryPointBridge>(serviceContext)); - serviceContext->setServiceExecutor( - std::make_unique<transport::ServiceExecutorSynchronous>(serviceContext)); - - fassert(50766, serviceContext->getServiceExecutor()->start()); + if (auto status = serviceContext->getServiceEntryPoint()->start(); !status.isOK()) { + LOGV2(4907203, "Error starting service entry point", "error"_attr = status); + } transport::TransportLayerASIO::Options opts; opts.ipList.emplace_back("0.0.0.0"); @@ -497,13 +496,13 @@ int bridgeMain(int argc, char** argv) { serviceContext->setTransportLayer(std::make_unique<mongo::transport::TransportLayerASIO>( opts, serviceContext->getServiceEntryPoint())); auto tl = serviceContext->getTransportLayer(); - if (!tl->setup().isOK()) { - LOGV2(22922, "Error setting up transport layer"); + if (auto status = tl->setup(); !status.isOK()) { + LOGV2(22922, "Error setting up transport layer", "error"_attr = status); return EXIT_NET_ERROR; } - if (!tl->start().isOK()) { - LOGV2(22923, "Error starting transport layer"); + if (auto status = tl->start(); !status.isOK()) { + LOGV2(22923, "Error starting transport layer", "error"_attr = status); return EXIT_NET_ERROR; } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 1783a1cd9b8..abd2e210518 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -180,7 +180,8 @@ tlEnv.CppUnitTest( 'transport_layer_asio_test.cpp', 'service_executor_test.cpp', 'max_conns_override_test.cpp', - 'service_state_machine_test.cpp', + # TODO: service_state_machine test to be re-written in SERVER-50141. + # 'service_state_machine_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 83bbe73c43a..ee83e1e48a0 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -48,6 +48,10 @@ #include <sys/resource.h> #endif +#if !defined(__has_feature) +#define __has_feature(x) 0 +#endif + namespace mongo { bool shouldOverrideMaxConns(const transport::SessionHandle& session, @@ -114,20 +118,29 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s } _maxNumConnections = supportedMax; - - if (serverGlobalParams.reservedAdminThreads) { - _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>( - _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); - } } Status ServiceEntryPointImpl::start() { - if (_adminInternalPool) - return _adminInternalPool->start(); - else - return Status::OK(); + if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); + !status.isOK()) { + return status; + } + + if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (auto status = exec->start(); !status.isOK()) { + return status; + } + } + + // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 + // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { + // return status; + // } + + return Status::OK(); } +// TODO: explicitly start on the fixed executor void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs const auto& remoteAddr = session->remoteAddr(); @@ -140,7 +153,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { const bool quiet = serverGlobalParams.quiet.load(); size_t connectionCount; - auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); + auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode(); auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); auto usingMaxConnOverride = false; @@ -168,8 +181,9 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { "connectionCount"_attr = connectionCount); } return; - } else if (usingMaxConnOverride && _adminInternalPool) { - ssm->setServiceExecutor(_adminInternalPool.get()); + } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx); + usingMaxConnOverride && exec) { + ssm->setServiceExecutor(exec); } if (!quiet) { @@ -219,8 +233,14 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { } bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { +#if __has_feature(address_sanitizer) || __has_feature(thread_sanitizer) + // When running under address sanitizer, we get false positive leaks due to disorder around + // the lifecycle of a connection and request. When we are running under ASAN, we try a lot + // harder to dry up the server from active connections before going on to really shut down. + using logv2::LogComponent; + auto start = _svcCtx->getPreciseClockSource()->now(); stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); // Request that all sessions end, while holding the _sesionsMutex, loop over all the current @@ -257,7 +277,37 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { "shutdown: exhausted grace period active workers to drain; continuing with shutdown...", "workers"_attr = numOpenSessions()); } + + lk.unlock(); + + // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 + // timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + // timeout = std::max(Milliseconds{0}, timeout - timeSpent); + // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); + // !status.isOK()) { + // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); + // } + + timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + timeout = std::max(Milliseconds{0}, timeout - timeSpent); + if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (auto status = exec->shutdown(timeout); !status.isOK()) { + LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); + } + } + + timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + timeout = std::max(Milliseconds{0}, timeout - timeSpent); + if (auto status = + transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent); + !status.isOK()) { + LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); + } + return result; +#else + return true; +#endif } void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { @@ -267,18 +317,18 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { bob->append("current", static_cast<int>(sessionCount)); bob->append("available", static_cast<int>(_maxNumConnections - sessionCount)); bob->append("totalCreated", static_cast<int>(_createdConnections.load())); - if (auto sc = getGlobalServiceContext()) { - bob->append("active", static_cast<int>(sc->getActiveClientOperations())); - bob->append("exhaustIsMaster", - static_cast<int>(HelloMetrics::get(sc)->getNumExhaustIsMaster())); - bob->append("exhaustHello", static_cast<int>(HelloMetrics::get(sc)->getNumExhaustHello())); - bob->append("awaitingTopologyChanges", - static_cast<int>(HelloMetrics::get(sc)->getNumAwaitingTopologyChanges())); - } - if (_adminInternalPool) { + invariant(_svcCtx); + bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations())); + bob->append("exhaustIsMaster", + static_cast<int>(HelloMetrics::get(_svcCtx)->getNumExhaustIsMaster())); + bob->append("exhaustHello", static_cast<int>(HelloMetrics::get(_svcCtx)->getNumExhaustHello())); + bob->append("awaitingTopologyChanges", + static_cast<int>(HelloMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges())); + + if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) { BSONObjBuilder section(bob->subobjStart("adminConnections")); - _adminInternalPool->appendStats(§ion); + adminExec->appendStats(§ion); } } diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 38c1319657a..0528a19fbdb 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -36,7 +36,9 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/variant.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_reserved.h" +#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/service_state_machine.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/net/cidr.h" @@ -90,8 +92,6 @@ private: size_t _maxNumConnections{DEFAULT_MAX_CONN}; AtomicWord<size_t> _currentConnections{0}; AtomicWord<size_t> _createdConnections{0}; - - std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool; }; /* diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index 2a1fc737cc1..b068e487126 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -37,6 +37,7 @@ #include "mongo/transport/session.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" +#include "mongo/util/thread_safety_context.h" namespace mongo { @@ -49,6 +50,15 @@ namespace { constexpr auto kThreadsRunning = "threadsRunning"_sd; constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "fixed"_sd; + +const auto getServiceExecutorFixed = + ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>(); + +const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{ + "ServiceExecutorFixed", [](ServiceContext* ctx) { + getServiceExecutorFixed(ctx) = + std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{}); + }}; } // namespace ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) @@ -86,6 +96,12 @@ Status ServiceExecutorFixed::start() { return Status::OK(); } +ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) { + auto& ref = getServiceExecutorFixed(ctx); + invariant(ref); + return ref.get(); +} + Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { auto waitForShutdown = [&]() mutable -> Status { stdx::unique_lock<Latch> lk(_mutex); diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 82be057e1f5..6c540576a78 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -33,6 +33,7 @@ #include <memory> #include "mongo/base/status.h" +#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -55,6 +56,8 @@ public: explicit ServiceExecutorFixed(ThreadPool::Options options); virtual ~ServiceExecutorFixed(); + static ServiceExecutorFixed* get(ServiceContext* ctx); + Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index cae9653e60f..d81ad5be200 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -33,11 +33,13 @@ #include "mongo/transport/service_executor_reserved.h" +#include "mongo/db/server_options.h" #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" +#include "mongo/util/thread_safety_context.h" namespace mongo { namespace transport { @@ -48,6 +50,19 @@ constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "reserved"_sd; constexpr auto kReadyThreads = "readyThreads"_sd; constexpr auto kStartingThreads = "startingThreads"_sd; + +const auto getServiceExecutorReserved = + ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>(); + +const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{ + "ServiceExecutorReserved", [](ServiceContext* ctx) { + if (!serverGlobalParams.reservedAdminThreads) { + return; + } + + getServiceExecutorReserved(ctx) = std::make_unique<transport::ServiceExecutorReserved>( + ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); + }}; } // namespace thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {}; @@ -147,6 +162,12 @@ Status ServiceExecutorReserved::_startWorker() { }); } +ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) { + auto& ref = getServiceExecutorReserved(ctx); + + // The ServiceExecutorReserved could be absent, so nullptr is okay. + return ref.get(); +} Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { LOGV2_DEBUG(22980, 3, "Shutting down reserved executor"); @@ -173,8 +194,8 @@ Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) { if (!_localWorkQueue.empty()) { // Execute task directly (recurse) if allowed by the caller as it produced better // performance in testing. Try to limit the amount of recursion so we don't blow up the - // stack, even though this shouldn't happen with this executor that uses blocking network - // I/O. + // stack, even though this shouldn't happen with this executor that uses blocking + // network I/O. if ((flags & ScheduleFlags::kMayRecurse) && (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) { ++_localRecursionDepth; diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index e3acf6febd9..60de4bc2993 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -32,6 +32,7 @@ #include <deque> #include "mongo/base/status.h" +#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -54,6 +55,8 @@ class ServiceExecutorReserved final : public ServiceExecutor { public: explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads); + static ServiceExecutorReserved* get(ServiceContext* ctx); + Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index d034e208954..c75bdbb0952 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -38,6 +38,7 @@ #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" +#include "mongo/util/thread_safety_context.h" namespace mongo { namespace transport { @@ -45,6 +46,14 @@ namespace { constexpr auto kThreadsRunning = "threadsRunning"_sd; constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "passthrough"_sd; + +const auto getServiceExecutorSynchronous = + ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>(); + +const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{ + "ServiceExecutorSynchronous", [](ServiceContext* ctx) { + getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx); + }}; } // namespace thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {}; @@ -78,6 +87,12 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { "passthrough executor couldn't shutdown all worker threads within time limit."); } +ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { + auto& ref = getServiceExecutorSynchronous(ctx); + invariant(ref); + return ref.get(); +} + Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) { if (!_stillRunning.load()) { return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 940382b53e8..840dc702d4c 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -32,6 +32,7 @@ #include <deque> #include "mongo/base/status.h" +#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -49,6 +50,8 @@ class ServiceExecutorSynchronous final : public ServiceExecutor { public: explicit ServiceExecutorSynchronous(ServiceContext* ctx); + static ServiceExecutorSynchronous* get(ServiceContext* ctx); + Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 373a362d0df..3bfd208d1d1 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -45,6 +45,7 @@ #include "mongo/rpc/op_msg.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" @@ -300,11 +301,11 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, _sep{svcContext->getServiceEntryPoint()}, _transportMode(transportMode), _serviceContext(svcContext), - _serviceExecutor(_serviceContext->getServiceExecutor()), _sessionHandle(session), _threadName{str::stream() << "conn" << _session()->id()}, _dbClient{svcContext->makeClient(_threadName, std::move(session))}, - _dbClientPtr{_dbClient.get()} {} + _dbClientPtr{_dbClient.get()}, + _serviceExecutor(transport::ServiceExecutorSynchronous::get(_serviceContext)) {} const transport::SessionHandle& ServiceStateMachine::_session() const { return _sessionHandle; diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 0572c4f6ac8..22e0b8aa752 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -230,12 +230,12 @@ private: transport::Mode _transportMode; ServiceContext* const _serviceContext; - transport::ServiceExecutor* _serviceExecutor; transport::SessionHandle _sessionHandle; const std::string _threadName; ServiceContext::UniqueClient _dbClient; const Client* _dbClientPtr; + transport::ServiceExecutor* _serviceExecutor; std::function<void()> _cleanupHook; bool _inExhaust = false; diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index b291671eb9a..272a9ac391d 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -138,8 +138,6 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( transport::TransportLayerASIO::Options opts(config); opts.transportMode = transport::Mode::kSynchronous; - ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx)); - std::vector<std::unique_ptr<TransportLayer>> retVector; retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep)); return std::make_unique<TransportLayerManager>(std::move(retVector)); |