From 827a25eb01bc5ddf766b3a543ef0ba5112953e1b Mon Sep 17 00:00:00 2001 From: Ben Caimano Date: Fri, 11 Sep 2020 20:09:59 +0000 Subject: SERVER-50867 Roll back ServiceStateMachine changes temporarily This reverts these commits: b039b24746e1d1fb10a32e1ca4831423c01d4cd7: SERVER-48980 97e16187ff3065d242a61a52e7b6edd4d439fb30: SERVER-49072 0607a6c291bf4cf4580a4444d826ed3c3ac3df47: SERVER-49104 --- src/mongo/db/SConscript | 1 - src/mongo/db/commands/server_status_servers.cpp | 4 +- src/mongo/db/curop.cpp | 7 - src/mongo/db/mongod_main.cpp | 29 ++- src/mongo/db/service_context.cpp | 8 + src/mongo/db/service_context.h | 19 ++ src/mongo/s/mongos_main.cpp | 26 +++ src/mongo/tools/bridge.cpp | 15 +- src/mongo/transport/SConscript | 4 +- src/mongo/transport/service_entry_point.h | 9 - src/mongo/transport/service_entry_point_impl.cpp | 163 +++++-------- src/mongo/transport/service_entry_point_impl.h | 12 +- src/mongo/transport/service_executor.cpp | 135 ----------- src/mongo/transport/service_executor.h | 77 +------ src/mongo/transport/service_executor_fixed.cpp | 16 -- src/mongo/transport/service_executor_fixed.h | 3 - src/mongo/transport/service_executor_reserved.cpp | 25 +- src/mongo/transport/service_executor_reserved.h | 3 - .../transport/service_executor_synchronous.cpp | 15 -- src/mongo/transport/service_executor_synchronous.h | 3 - src/mongo/transport/service_state_machine.cpp | 251 ++++++++++++--------- src/mongo/transport/service_state_machine.h | 74 ++++-- src/mongo/transport/transport_layer_manager.cpp | 2 + 23 files changed, 349 insertions(+), 552 deletions(-) delete mode 100644 src/mongo/transport/service_executor.cpp diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index d39b484ce99..701e82a657f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -152,7 +152,6 @@ env.Library( '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/timer_stats', '$BUILD_DIR/mongo/rpc/client_metadata', - '$BUILD_DIR/mongo/transport/service_executor', '$BUILD_DIR/mongo/util/diagnostic_info' if get_option('use-diagnostic-latches') == 'on' else [], '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/network', diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp index cc39b6da7d8..85a57202370 100644 --- a/src/mongo/db/commands/server_status_servers.cpp +++ b/src/mongo/db/commands/server_status_servers.cpp @@ -33,7 +33,6 @@ #include "mongo/db/commands/server_status.h" #include "mongo/transport/message_compressor_registry.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_synchronous.h" #include "mongo/util/net/hostname_canonicalization.h" #include "mongo/util/net/socket_utils.h" #include "mongo/util/net/ssl_manager.h" @@ -78,13 +77,12 @@ public: return true; } - // TODO: need to track connections in server stats (see SERVER-49073) BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override { BSONObjBuilder b; networkCounter.append(b); appendMessageCompressionStats(&b); - auto executor = transport::ServiceExecutorSynchronous::get(opCtx->getServiceContext()); + auto executor = opCtx->getServiceContext()->getServiceExecutor(); if (executor) { BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats")); executor->appendStats(§ion); diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 06c7218adbb..26d598c6b08 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -54,7 +54,6 @@ #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/rpc/metadata/impersonated_user_metadata.h" -#include "mongo/transport/service_executor.h" #include "mongo/util/hex.h" #include "mongo/util/log_with_sampling.h" #include "mongo/util/net/socket_utils.h" @@ -305,12 +304,6 @@ void CurOp::reportCurrentOpForClient(OperationContext* opCtx, serializeAuthenticatedUsers("effectiveUsers"_sd); } - if (const auto seCtx = transport::ServiceExecutorContext::get(client)) { - bool isDedicated = (seCtx->getThreadingModel() == - transport::ServiceExecutorContext::ThreadingModel::kDedicated); - infoBuilder->append("threaded"_sd, isDedicated); - } - if (clientOpCtx) { infoBuilder->append("opid", static_cast(clientOpCtx->getOpID())); diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 8e69eea713a..8d107a27e54 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -723,7 +723,16 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { // operation context anymore startupOpCtx.reset(); - auto start = serviceContext->getServiceEntryPoint()->start(); + auto start = serviceContext->getServiceExecutor()->start(); + if (!start.isOK()) { + LOGV2_ERROR(20570, + "Error starting service executor: {error}", + "Error starting service executor", + "error"_attr = start); + return EXIT_NET_ERROR; + } + + start = serviceContext->getServiceEntryPoint()->start(); if (!start.isOK()) { LOGV2_ERROR(20571, "Error starting service entry point: {error}", @@ -1260,6 +1269,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { CatalogCacheLoader::get(serviceContext).shutDown(); } +#if __has_feature(address_sanitizer) + // When running under address sanitizer, we get false positive leaks due to disorder around + // the lifecycle of a connection and request. When we are running under ASAN, we try a lot + // harder to dry up the server from active connections before going on to really shut down. + // Shutdown the Service Entry Point and its sessions and give it a grace period to complete. if (auto sep = serviceContext->getServiceEntryPoint()) { LOGV2_OPTIONS(4784923, {LogComponent::kCommand}, "Shutting down the ServiceEntryPoint"); @@ -1270,6 +1284,19 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { } } + // Shutdown and wait for the service executor to exit + if (auto svcExec = serviceContext->getServiceExecutor()) { + LOGV2_OPTIONS(4784924, {LogComponent::kExecutor}, "Shutting down the service executor"); + Status status = svcExec->shutdown(Seconds(10)); + if (!status.isOK()) { + LOGV2_OPTIONS(20564, + {LogComponent::kNetwork}, + "Service executor did not shutdown within the time limit", + "error"_attr = status); + } + } +#endif + LOGV2(4784925, "Shutting down free monitoring"); stopFreeMonitoring(); diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index 1b8628ff1dd..4e01b46c3b4 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -193,6 +193,10 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const { return _serviceEntryPoint.get(); } +transport::ServiceExecutor* ServiceContext::getServiceExecutor() const { + return _serviceExecutor.get(); +} + void ServiceContext::setStorageEngine(std::unique_ptr engine) { invariant(engine); invariant(!_storageEngine); @@ -223,6 +227,10 @@ void ServiceContext::setTransportLayer(std::unique_ptr exec) { + _serviceExecutor = std::move(exec); +} + void ServiceContext::ClientDeleter::operator()(Client* client) const { ServiceContext* const service = client->getServiceContext(); { diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 08e93df9cfb..bad89b6cc58 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -42,6 +42,7 @@ #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/unordered_set.h" +#include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/util/clock_source.h" #include "mongo/util/concurrency/with_lock.h" @@ -486,6 +487,14 @@ public: */ ServiceEntryPoint* getServiceEntryPoint() const; + /** + * Get the service executor for the service context. + * + * See ServiceStateMachine for how this is used. Some configurations may not have a service + * executor registered and this will return a nullptr. + */ + transport::ServiceExecutor* getServiceExecutor() const; + /** * Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been * added/started. @@ -570,6 +579,11 @@ public: */ void setTransportLayer(std::unique_ptr tl); + /** + * Binds the service executor to the service context + */ + void setServiceExecutor(std::unique_ptr exec); + /** * Creates a delayed execution baton with basic functionality */ @@ -632,6 +646,11 @@ private: */ std::unique_ptr _serviceEntryPoint; + /** + * The ServiceExecutor + */ + std::unique_ptr _serviceExecutor; + /** * The storage engine, if any. */ diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp index a761c881b6f..e55cdb0d7d7 100644 --- a/src/mongo/s/mongos_main.cpp +++ b/src/mongo/s/mongos_main.cpp @@ -359,6 +359,11 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) { CatalogCacheLoader::get(serviceContext).shutDown(); } +#if __has_feature(address_sanitizer) + // When running under address sanitizer, we get false positive leaks due to disorder around + // the lifecycle of a connection and request. When we are running under ASAN, we try a lot + // harder to dry up the server from active connections before going on to really shut down. + // Shutdown the Service Entry Point and its sessions and give it a grace period to complete. if (auto sep = serviceContext->getServiceEntryPoint()) { if (!sep->shutdown(Seconds(10))) { @@ -368,6 +373,18 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) { } } + // Shutdown and wait for the service executor to exit + if (auto svcExec = serviceContext->getServiceExecutor()) { + Status status = svcExec->shutdown(Seconds(5)); + if (!status.isOK()) { + LOGV2_OPTIONS(22845, + {LogComponent::kNetwork}, + "Service executor did not shutdown within the time limit", + "error"_attr = status); + } + } +#endif + // Shutdown Full-Time Data Capture stopMongoSFTDC(); } @@ -778,6 +795,15 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { std::make_unique(), RouterSessionCatalog::reapSessionsOlderThan)); + status = serviceContext->getServiceExecutor()->start(); + if (!status.isOK()) { + LOGV2_ERROR(22859, + "Error starting service executor: {error}", + "Error starting service executor", + "error"_attr = redact(status)); + return EXIT_NET_ERROR; + } + status = serviceContext->getServiceEntryPoint()->start(); if (!status.isOK()) { LOGV2_ERROR(22860, diff --git a/src/mongo/tools/bridge.cpp b/src/mongo/tools/bridge.cpp index 1c2f7d46bc9..986114093ed 100644 --- a/src/mongo/tools/bridge.cpp +++ b/src/mongo/tools/bridge.cpp @@ -485,9 +485,10 @@ int bridgeMain(int argc, char** argv) { setGlobalServiceContext(ServiceContext::make()); auto serviceContext = getGlobalServiceContext(); serviceContext->setServiceEntryPoint(std::make_unique(serviceContext)); - if (auto status = serviceContext->getServiceEntryPoint()->start(); !status.isOK()) { - LOGV2(4907203, "Error starting service entry point", "error"_attr = status); - } + serviceContext->setServiceExecutor( + std::make_unique(serviceContext)); + + fassert(50766, serviceContext->getServiceExecutor()->start()); transport::TransportLayerASIO::Options opts; opts.ipList.emplace_back("0.0.0.0"); @@ -496,13 +497,13 @@ int bridgeMain(int argc, char** argv) { serviceContext->setTransportLayer(std::make_unique( opts, serviceContext->getServiceEntryPoint())); auto tl = serviceContext->getTransportLayer(); - if (auto status = tl->setup(); !status.isOK()) { - LOGV2(22922, "Error setting up transport layer", "error"_attr = status); + if (!tl->setup().isOK()) { + LOGV2(22922, "Error setting up transport layer"); return EXIT_NET_ERROR; } - if (auto status = tl->start(); !status.isOK()) { - LOGV2(22923, "Error starting transport layer", "error"_attr = status); + if (!tl->start().isOK()) { + LOGV2(22923, "Error starting transport layer"); return EXIT_NET_ERROR; } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 7932abc662d..5326afe5db2 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -86,7 +86,6 @@ env.Library( tlEnv.Library( target='service_executor', source=[ - 'service_executor.cpp', 'service_executor_fixed.cpp', 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', @@ -181,8 +180,7 @@ tlEnv.CppUnitTest( 'transport_layer_asio_test.cpp', 'service_executor_test.cpp', 'max_conns_override_test.cpp', - # TODO: service_state_machine test to be re-written in SERVER-50141 - # 'service_state_machine_test.cpp', + 'service_state_machine_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index fb1ad895d78..aa970f1ef67 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,8 +29,6 @@ #pragma once -#include - #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" @@ -82,13 +80,6 @@ public: */ virtual size_t numOpenSessions() const = 0; - /** - * Returns the maximum number of sessions that can be open. - */ - virtual size_t maxOpenSessions() const { - return std::numeric_limits::max(); - } - /** * Processes a request and fills out a DbResponse. */ diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 0090f30a686..1824ccf1c94 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -33,7 +33,6 @@ #include "mongo/transport/service_entry_point_impl.h" -#include #include #include "mongo/db/auth/restriction_environment.h" @@ -49,23 +48,12 @@ #include #endif -#if !defined(__has_feature) -#define __has_feature(x) 0 -#endif - namespace mongo { -using namespace fmt::literals; - bool shouldOverrideMaxConns(const transport::SessionHandle& session, const std::vector>& exemptions) { - if (exemptions.empty()) { - return false; - } - const auto& remoteAddr = session->remoteAddr(); const auto& localAddr = session->localAddr(); - boost::optional remoteCIDR; if (remoteAddr.isValid() && remoteAddr.isIP()) { @@ -93,7 +81,8 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session, return false; } -size_t getSupportedMax() { +ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { + const auto supportedMax = [] { #ifdef _WIN32 return serverGlobalParams.maxConns; @@ -124,33 +113,21 @@ size_t getSupportedMax() { "limit"_attr = supportedMax); } - return supportedMax; -} - -ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) - : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {} + _maxNumConnections = supportedMax; -Status ServiceEntryPointImpl::start() { - if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); - !status.isOK()) { - return status; + if (serverGlobalParams.reservedAdminThreads) { + _adminInternalPool = std::make_unique( + _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); } +} - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->start(); !status.isOK()) { - return status; - } - } - - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { - // return status; - // } - - return Status::OK(); +Status ServiceEntryPointImpl::start() { + if (_adminInternalPool) + return _adminInternalPool->start(); + else + return Status::OK(); } -// TODO: explicitly start on the fixed executor void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs const auto& remoteAddr = session->remoteAddr(); @@ -159,48 +136,43 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto restrictionEnvironment = std::make_unique(remoteAddr, localAddr); RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); - bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - - auto clientName = "conn{}"_format(session->id()); - auto client = _svcCtx->makeClient(clientName, session); - - { - stdx::lock_guard lk(*client); - auto seCtx = - transport::ServiceExecutorContext{} - .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated) - .setCanUseReserved(canOverrideMaxConns); - - transport::ServiceExecutorContext::set(client.get(), std::move(seCtx)); - } - - auto ssm = std::make_shared(std::move(client)); + SSMListIterator ssmIt; const bool quiet = serverGlobalParams.quiet.load(); - size_t connectionCount; - auto ssmIt = [&]() -> boost::optional { - stdx::lock_guard lk(_sessionsMutex); - connectionCount = _currentConnections.load(); - if (connectionCount > _maxNumConnections && !canOverrideMaxConns) { - return boost::none; + auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); + + auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); + auto usingMaxConnOverride = false; + { + stdx::lock_guard lk(_sessionsMutex); + connectionCount = _sessions.size() + 1; + if (connectionCount > _maxNumConnections) { + usingMaxConnOverride = + shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); } - auto it = _sessions.emplace(_sessions.begin(), ssm); - connectionCount = _sessions.size(); - _currentConnections.store(connectionCount); - _createdConnections.addAndFetch(1); - return it; - }(); + if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { + ssmIt = _sessions.emplace(_sessions.begin(), ssm); + _currentConnections.store(connectionCount); + _createdConnections.addAndFetch(1); + } + } - if (!ssmIt) { + // Checking if we successfully added a connection above. Separated from the lock so we don't log + // while holding it. + if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", "connectionCount"_attr = connectionCount); } return; - } else if (!quiet) { + } else if (usingMaxConnOverride && _adminInternalPool) { + ssm->setServiceExecutor(_adminInternalPool.get()); + } + + if (!quiet) { LOGV2(22943, "Connection accepted", "remote"_attr = session->remote(), @@ -213,7 +185,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto remote = session->remote(); { stdx::lock_guard lk(_sessionsMutex); - _sessions.erase(*ssmIt); + _sessions.erase(ssmIt); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); } @@ -228,7 +200,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { } }); - ssm->start(); + auto ownership = ServiceStateMachine::Ownership::kOwned; + if (transportMode == transport::Mode::kSynchronous) { + ownership = ServiceStateMachine::Ownership::kStatic; + } + ssm->start(ownership); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { @@ -243,13 +219,8 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { } bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { -#if __has_feature(address_sanitizer) - // When running under address sanitizer, we get false positive leaks due to disorder around - // the lifecycle of a connection and request. When we are running under ASAN, we try a lot - // harder to dry up the server from active connections before going on to really shut down. using logv2::LogComponent; - auto start = _svcCtx->getPreciseClockSource()->now(); stdx::unique_lock lk(_sessionsMutex); // Request that all sessions end, while holding the _sesionsMutex, loop over all the current @@ -286,37 +257,7 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { "shutdown: exhausted grace period active workers to drain; continuing with shutdown...", "workers"_attr = numOpenSessions()); } - - lk.unlock(); - - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - // timeout = std::max(Milliseconds{0}, timeout - timeSpent); - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); - // !status.isOK()) { - // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); - // } - - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->shutdown(timeout); !status.isOK()) { - LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); - } - } - - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto status = - transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent); - !status.isOK()) { - LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); - } - return result; -#else - return true; -#endif } void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { @@ -326,17 +267,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { bob->append("current", static_cast(sessionCount)); bob->append("available", static_cast(_maxNumConnections - sessionCount)); bob->append("totalCreated", static_cast(_createdConnections.load())); + if (auto sc = getGlobalServiceContext()) { + bob->append("active", static_cast(sc->getActiveClientOperations())); + bob->append("exhaustIsMaster", + static_cast(IsMasterMetrics::get(sc)->getNumExhaustIsMaster())); + bob->append("awaitingTopologyChanges", + static_cast(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges())); + } - invariant(_svcCtx); - bob->append("active", static_cast(_svcCtx->getActiveClientOperations())); - bob->append("exhaustIsMaster", - static_cast(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster())); - bob->append("awaitingTopologyChanges", - static_cast(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges())); - - if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (_adminInternalPool) { BSONObjBuilder section(bob->subobjStart("adminConnections")); - adminExec->appendStats(§ion); + _adminInternalPool->appendStats(§ion); } } diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 1693469fcda..38c1319657a 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -36,9 +36,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/variant.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_reserved.h" -#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/service_state_machine.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/net/cidr.h" @@ -77,12 +75,8 @@ public: return _currentConnections.load(); } - size_t maxOpenSessions() const final { - return _maxNumConnections; - } - private: - using SSMList = std::list>; + using SSMList = std::list>; using SSMListIterator = SSMList::iterator; ServiceContext* const _svcCtx; @@ -93,9 +87,11 @@ private: stdx::condition_variable _shutdownCondition; SSMList _sessions; - const size_t _maxNumConnections{DEFAULT_MAX_CONN}; + size_t _maxNumConnections{DEFAULT_MAX_CONN}; AtomicWord _currentConnections{0}; AtomicWord _createdConnections{0}; + + std::unique_ptr _adminInternalPool; }; /* diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp deleted file mode 100644 index 37c3d03e1b1..00000000000 --- a/src/mongo/transport/service_executor.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor.h" - -#include - -#include "mongo/logv2/log.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_fixed.h" -#include "mongo/transport/service_executor_reserved.h" -#include "mongo/transport/service_executor_synchronous.h" - -namespace mongo { -namespace transport { -namespace { -static constexpr auto kDiagnosticLogLevel = 4; - -auto getServiceExecutorContext = - Client::declareDecoration>(); -} // namespace - -StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) { - switch (threadingModel) { - case ServiceExecutorContext::ThreadingModel::kDedicated: - return "Dedicated"_sd; - case ServiceExecutorContext::ThreadingModel::kBorrowed: - return "Borrowed"_sd; - default: - MONGO_UNREACHABLE; - } -} - -ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept { - auto& serviceExecutorContext = getServiceExecutorContext(client); - - if (!serviceExecutorContext) { - // Service worker Clients will never have a ServiceExecutorContext. - return nullptr; - } - - return &serviceExecutorContext.get(); -} - -void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) noexcept { - auto& serviceExecutorContext = getServiceExecutorContext(client); - invariant(!serviceExecutorContext); - - seCtx._client = client; - seCtx._sep = client->getServiceContext()->getServiceEntryPoint(); - - LOGV2_DEBUG(4898000, - kDiagnosticLogLevel, - "Setting initial ServiceExecutor context for client", - "client"_attr = client->desc(), - "threadingModel"_attr = seCtx._threadingModel, - "canUseReserved"_attr = seCtx._canUseReserved); - serviceExecutorContext = std::move(seCtx); -} - -ServiceExecutorContext& ServiceExecutorContext::setThreadingModel( - ThreadingModel threadingModel) noexcept { - _threadingModel = threadingModel; - return *this; -} - -ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept { - _canUseReserved = canUseReserved; - return *this; -} - -ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept { - invariant(_client); - - switch (_threadingModel) { - case ThreadingModel::kBorrowed: - return ServiceExecutorFixed::get(_client->getServiceContext()); - case ThreadingModel::kDedicated: { - // Continue on. - } break; - default: - MONGO_UNREACHABLE; - } - - auto shouldUseReserved = [&] { - // This is at best a naive solution. There could be a world where numOpenSessions() changes - // very quickly. We are not taking locks on the ServiceEntryPoint, so we may chose to - // schedule onto the ServiceExecutorReserved when it is no longer necessary. The upside is - // that we will automatically shift to the ServiceExecutorSynchronous after the first - // command loop. - return _sep->numOpenSessions() > _sep->maxOpenSessions(); - }; - - if (_canUseReserved && shouldUseReserved()) { - if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) { - // We are allowed to use the reserved executor, we should use it, and it exists. - return exec; - } - } - - return transport::ServiceExecutorSynchronous::get(_client->getServiceContext()); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 38116908272..b702198e6b5 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -33,19 +33,20 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/client.h" -#include "mongo/db/service_context.h" #include "mongo/platform/bitwise_enum_operators.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" #include "mongo/util/duration.h" #include "mongo/util/functional.h" #include "mongo/util/out_of_line_executor.h" namespace mongo { +// This needs to be forward declared here because the service_context.h is a circular dependency. +class ServiceContext; + namespace transport { +class Session; + /* * This is the interface for all ServiceExecutors. */ @@ -122,74 +123,6 @@ public: virtual void appendStats(BSONObjBuilder* bob) const = 0; }; -/** - * ServiceExecutorContext determines which ServiceExecutor is used for each Client. - */ -class ServiceExecutorContext { -public: - enum ThreadingModel { - kBorrowed, - kDedicated, - }; - - /** - * Get a pointer to the ServiceExecutorContext for a given client. - * - * This function is valid to invoke either on the Client thread or with the Client lock. - */ - static ServiceExecutorContext* get(Client* client) noexcept; - - /** - * Set the ServiceExecutorContext for a given client. - * - * This function may only be invoked once and only while under the Client lock. - */ - static void set(Client* client, ServiceExecutorContext seCtx) noexcept; - - ServiceExecutorContext() = default; - - /** - * Set the ThreadingModel for the associated Client's service execution. - * - * This function is only valid to invoke with the Client lock or before the Client is set. - */ - ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept; - - /** - * Set if reserved resources are available for the associated Client's service execution. - * - * This function is only valid to invoke with the Client lock or before the Client is set. - */ - ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept; - - /** - * Get the ThreadingModel for the associated Client. - * - * This function is valid to invoke either on the Client thread or with the Client lock. - */ - auto getThreadingModel() const noexcept { - return _threadingModel; - } - - /** - * Get an appropriate ServiceExecutor given the current parameters. - * - * This function is only valid to invoke from the associated Client thread. This function does - * not require the Client lock since all writes must also happen from that thread. - */ - ServiceExecutor* getServiceExecutor() const noexcept; - -private: - friend StringData toString(ThreadingModel threadingModel); - - Client* _client = nullptr; - ServiceEntryPoint* _sep = nullptr; - - ThreadingModel _threadingModel = ThreadingModel::kDedicated; - bool _canUseReserved = false; -}; - - } // namespace transport ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags) diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index b068e487126..2a1fc737cc1 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -37,7 +37,6 @@ #include "mongo/transport/session.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" -#include "mongo/util/thread_safety_context.h" namespace mongo { @@ -50,15 +49,6 @@ namespace { constexpr auto kThreadsRunning = "threadsRunning"_sd; constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "fixed"_sd; - -const auto getServiceExecutorFixed = - ServiceContext::declareDecoration>(); - -const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorFixed", [](ServiceContext* ctx) { - getServiceExecutorFixed(ctx) = - std::make_unique(ThreadPool::Options{}); - }}; } // namespace ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) @@ -96,12 +86,6 @@ Status ServiceExecutorFixed::start() { return Status::OK(); } -ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorFixed(ctx); - invariant(ref); - return ref.get(); -} - Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { auto waitForShutdown = [&]() mutable -> Status { stdx::unique_lock lk(_mutex); diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 6c540576a78..82be057e1f5 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -33,7 +33,6 @@ #include #include "mongo/base/status.h" -#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -56,8 +55,6 @@ public: explicit ServiceExecutorFixed(ThreadPool::Options options); virtual ~ServiceExecutorFixed(); - static ServiceExecutorFixed* get(ServiceContext* ctx); - Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index d81ad5be200..cae9653e60f 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -33,13 +33,11 @@ #include "mongo/transport/service_executor_reserved.h" -#include "mongo/db/server_options.h" #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" -#include "mongo/util/thread_safety_context.h" namespace mongo { namespace transport { @@ -50,19 +48,6 @@ constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "reserved"_sd; constexpr auto kReadyThreads = "readyThreads"_sd; constexpr auto kStartingThreads = "startingThreads"_sd; - -const auto getServiceExecutorReserved = - ServiceContext::declareDecoration>(); - -const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorReserved", [](ServiceContext* ctx) { - if (!serverGlobalParams.reservedAdminThreads) { - return; - } - - getServiceExecutorReserved(ctx) = std::make_unique( - ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); - }}; } // namespace thread_local std::deque ServiceExecutorReserved::_localWorkQueue = {}; @@ -162,12 +147,6 @@ Status ServiceExecutorReserved::_startWorker() { }); } -ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorReserved(ctx); - - // The ServiceExecutorReserved could be absent, so nullptr is okay. - return ref.get(); -} Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { LOGV2_DEBUG(22980, 3, "Shutting down reserved executor"); @@ -194,8 +173,8 @@ Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) { if (!_localWorkQueue.empty()) { // Execute task directly (recurse) if allowed by the caller as it produced better // performance in testing. Try to limit the amount of recursion so we don't blow up the - // stack, even though this shouldn't happen with this executor that uses blocking - // network I/O. + // stack, even though this shouldn't happen with this executor that uses blocking network + // I/O. if ((flags & ScheduleFlags::kMayRecurse) && (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) { ++_localRecursionDepth; diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index 60de4bc2993..e3acf6febd9 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -32,7 +32,6 @@ #include #include "mongo/base/status.h" -#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -55,8 +54,6 @@ class ServiceExecutorReserved final : public ServiceExecutor { public: explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads); - static ServiceExecutorReserved* get(ServiceContext* ctx); - Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index c75bdbb0952..d034e208954 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -38,7 +38,6 @@ #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" -#include "mongo/util/thread_safety_context.h" namespace mongo { namespace transport { @@ -46,14 +45,6 @@ namespace { constexpr auto kThreadsRunning = "threadsRunning"_sd; constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "passthrough"_sd; - -const auto getServiceExecutorSynchronous = - ServiceContext::declareDecoration>(); - -const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorSynchronous", [](ServiceContext* ctx) { - getServiceExecutorSynchronous(ctx) = std::make_unique(ctx); - }}; } // namespace thread_local std::deque ServiceExecutorSynchronous::_localWorkQueue = {}; @@ -87,12 +78,6 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { "passthrough executor couldn't shutdown all worker threads within time limit."); } -ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorSynchronous(ctx); - invariant(ref); - return ref.get(); -} - Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) { if (!_stillRunning.load()) { return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 840dc702d4c..940382b53e8 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -32,7 +32,6 @@ #include #include "mongo/base/status.h" -#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -50,8 +49,6 @@ class ServiceExecutorSynchronous final : public ServiceExecutor { public: explicit ServiceExecutorSynchronous(ServiceContext* ctx); - static ServiceExecutorSynchronous* get(ServiceContext* ctx); - Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5124542291b..373a362d0df 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -36,6 +36,7 @@ #include #include "mongo/config.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" #include "mongo/db/traffic_recorder.h" @@ -44,8 +45,6 @@ #include "mongo/rpc/op_msg.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_fixed.h" -#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" @@ -59,7 +58,6 @@ #include "mongo/util/quick_exit.h" namespace mongo { -namespace transport { namespace { MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome); /** @@ -161,14 +159,14 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { return exhaustMessage; } + } // namespace using transport::ServiceExecutor; using transport::TransportLayer; /* - * This class wraps up the logic for swapping/unswapping the Client when transitioning - * between states. + * This class wraps up the logic for swapping/unswapping the Client during runNext(). * * In debug builds this also ensures that only one thread is working on the SSM at once. */ @@ -194,10 +192,9 @@ public: // Set up the thread name auto oldThreadName = getThreadName(); - const auto& threadName = _ssm->_dbClient->desc(); - if (oldThreadName != threadName) { - _oldThreadName = oldThreadName.toString(); - setThreadName(threadName); + if (oldThreadName != _ssm->_threadName) { + _ssm->_oldThreadName = getThreadName().toString(); + setThreadName(_ssm->_threadName); } // Swap the current Client so calls to cc() work as expected @@ -259,8 +256,8 @@ public: _ssm->_dbClient = Client::releaseCurrent(); } - if (!_oldThreadName.empty()) { - setThreadName(_oldThreadName); + if (!_ssm->_oldThreadName.empty()) { + setThreadName(_ssm->_oldThreadName); } } @@ -288,75 +285,75 @@ public: private: ServiceStateMachine* _ssm; bool _haveTakenOwnership = false; - std::string _oldThreadName; }; -ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) +std::shared_ptr ServiceStateMachine::create(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode) { + return std::make_shared(svcContext, std::move(session), transportMode); +} + +ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode) : _state{State::Created}, - _serviceContext{client->getServiceContext()}, - _sep{_serviceContext->getServiceEntryPoint()}, - _dbClient{std::move(client)}, + _sep{svcContext->getServiceEntryPoint()}, + _transportMode(transportMode), + _serviceContext(svcContext), + _serviceExecutor(_serviceContext->getServiceExecutor()), + _sessionHandle(session), + _threadName{str::stream() << "conn" << _session()->id()}, + _dbClient{svcContext->makeClient(_threadName, std::move(session))}, _dbClientPtr{_dbClient.get()} {} const transport::SessionHandle& ServiceStateMachine::_session() const { - return _dbClientPtr->session(); + return _sessionHandle; } -ServiceExecutor* ServiceStateMachine::_executor() { - return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor(); -} - -Future ServiceStateMachine::_sourceMessage(ThreadGuard guard) { +void ServiceStateMachine::_sourceMessage(ThreadGuard guard) { invariant(_inMessage.empty()); invariant(_state.load() == State::Source); _state.store(State::SourceWait); guard.release(); auto sourceMsgImpl = [&] { - const auto& transportMode = _executor()->transportMode(); - if (transportMode == transport::Mode::kSynchronous) { + if (_transportMode == transport::Mode::kSynchronous) { MONGO_IDLE_THREAD_BLOCK; return Future::makeReady(_session()->sourceMessage()); } else { - invariant(transportMode == transport::Mode::kAsynchronous); + invariant(_transportMode == transport::Mode::kAsynchronous); return _session()->asyncSourceMessage(); } }; - return sourceMsgImpl().onCompletion([this](StatusWith msg) -> Future { + sourceMsgImpl().getAsync([this](StatusWith msg) { if (msg.isOK()) { _inMessage = std::move(msg.getValue()); invariant(!_inMessage.empty()); } _sourceCallback(msg.getStatus()); - return Status::OK(); }); } -Future ServiceStateMachine::_sinkMessage(ThreadGuard guard) { +void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) { // Sink our response to the client invariant(_state.load() == State::Process); _state.store(State::SinkWait); guard.release(); - auto toSink = std::exchange(_outMessage, {}); auto sinkMsgImpl = [&] { - const auto& transportMode = _executor()->transportMode(); - if (transportMode == transport::Mode::kSynchronous) { + if (_transportMode == transport::Mode::kSynchronous) { // We don't consider ourselves idle while sending the reply since we are still doing // work on behalf of the client. Contrast that with sourceMessage() where we are waiting // for the client to send us more work to do. return Future::makeReady(_session()->sinkMessage(std::move(toSink))); } else { - invariant(transportMode == transport::Mode::kAsynchronous); + invariant(_transportMode == transport::Mode::kAsynchronous); return _session()->asyncSinkMessage(std::move(toSink)); } }; - return sinkMsgImpl().onCompletion([this](Status status) { - _sinkCallback(std::move(status)); - return Status::OK(); - }); + sinkMsgImpl().getAsync([this](Status status) { _sinkCallback(std::move(status)); }); } void ServiceStateMachine::_sourceCallback(Status status) { @@ -371,11 +368,14 @@ void ServiceStateMachine::_sourceCallback(Status status) { if (status.isOK()) { _state.store(State::Process); - // If the sourceMessage succeeded then we can move to on to process the message. We - // simply return from here and the future chain in _runOnce() will continue to the - // next state normally. + // Since we know that we're going to process a message, call scheduleNext() immediately + // to schedule the call to processMessage() on the serviceExecutor (or just unwind the + // stack) - // If any other issues arise, close the session. + // If this callback doesn't own the ThreadGuard, then we're being called recursively, + // and the executor shouldn't start a new thread to process the message - it can use this + // one just after this returns. + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse); } else if (ErrorCodes::isInterruption(status.code()) || ErrorCodes::isNetworkError(status.code())) { LOGV2_DEBUG( @@ -401,7 +401,10 @@ void ServiceStateMachine::_sourceCallback(Status status) { "connectionId"_attr = _session()->id()); _state.store(State::EndSession); } - uassertStatusOK(status); + + // There was an error receiving a message from the client and we've already printed the error + // so call runNextInGuard() to clean up the session without waiting. + _runNextInGuard(std::move(guard)); } void ServiceStateMachine::_sinkCallback(Status status) { @@ -412,10 +415,10 @@ void ServiceStateMachine::_sinkCallback(Status status) { dassert(state() == State::SinkWait); // If there was an error sinking the message to the client, then we should print an error and - // end the session. + // end the session. No need to unwind the stack, so this will runNextInGuard() and return. // - // Otherwise, update the current state depending on whether we're in exhaust or not and return - // from this function to let _runOnce continue the future chaining of state transitions. + // Otherwise, update the current state depending on whether we're in exhaust or not, and call + // scheduleNext() to unwind the stack and do the next step. if (!status.isOK()) { LOGV2(22989, "Error sending response to client. Ending connection from remote", @@ -423,19 +426,25 @@ void ServiceStateMachine::_sinkCallback(Status status) { "remote"_attr = _session()->remote(), "connectionId"_attr = _session()->id()); _state.store(State::EndSession); - uassertStatusOK(status); + return _runNextInGuard(std::move(guard)); } else if (_inExhaust) { _state.store(State::Process); + return _scheduleNextWithGuard(std::move(guard), + ServiceExecutor::kDeferredTask | + ServiceExecutor::kMayYieldBeforeSchedule); } else { _state.store(State::Source); + return _scheduleNextWithGuard(std::move(guard), + ServiceExecutor::kDeferredTask | + ServiceExecutor::kMayYieldBeforeSchedule); } } -Future ServiceStateMachine::_processMessage(ThreadGuard guard) { +void ServiceStateMachine::_processMessage(ThreadGuard guard) { invariant(!_inMessage.empty()); TrafficRecorder::get(_serviceContext) - .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage); + .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage); auto& compressorMgr = MessageCompressorManager::forSession(_session()); @@ -458,7 +467,7 @@ Future ServiceStateMachine::_processMessage(ThreadGuard guard) { // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - return _sep->handleRequest(opCtx.get(), _inMessage) + _sep->handleRequest(opCtx.get(), _inMessage) .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx), @@ -507,77 +516,106 @@ Future ServiceStateMachine::_processMessage(ThreadGuard guard) { } TrafficRecorder::get(_serviceContext) - .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink); + .observe( + _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + + _sinkMessage(std::move(guard), std::move(toSink)); - _outMessage = std::move(toSink); } else { _state.store(State::Source); _inMessage.reset(); - _outMessage.reset(); _inExhaust = false; + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); } - }); + }) + .get(); } -void ServiceStateMachine::start() { - _executor()->schedule( - GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { - // TODO(SERVER-49109) We can't use static ownership in general with - // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter. - ThreadGuard guard(shared_from_this().get()); - guard.markStaticOwnership(); +void ServiceStateMachine::runNext() { + return _runNextInGuard(ThreadGuard(this)); +} - // If this is the first run of the SSM, then update its state to Source - if (state() == State::Created) { - _state.store(State::Source); - } +void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) { + auto curState = state(); + dassert(curState != State::Ended); - _runOnce(); - })); -} + // If this is the first run of the SSM, then update its state to Source + if (curState == State::Created) { + curState = State::Source; + _state.store(curState); + } -void ServiceStateMachine::_runOnce() { - makeReadyFutureWith([&]() -> Future { - if (_inExhaust) { - return Status::OK(); - } else { - return _sourceMessage(ThreadGuard(this)); + // Destroy the opCtx (already killed) here, to potentially use the delay between clients' + // requests to hide the destruction cost. + if (MONGO_likely(_killedOpCtx)) { + _killedOpCtx.reset(); + } + + // Make sure the current Client got set correctly + dassert(Client::getCurrent() == _dbClientPtr); + try { + switch (curState) { + case State::Source: + _sourceMessage(std::move(guard)); + break; + case State::Process: + _processMessage(std::move(guard)); + break; + case State::EndSession: + _cleanupSession(std::move(guard)); + break; + default: + MONGO_UNREACHABLE; } - }) - .then([this]() { return _processMessage(ThreadGuard(this)); }) - .then([this]() -> Future { - if (_outMessage.empty()) { - return Status::OK(); - } - return _sinkMessage(ThreadGuard(this)); - }) - .getAsync([this, anchor = shared_from_this()](Status status) { - // Destroy the opCtx (already killed) here, to potentially use the delay between - // clients' requests to hide the destruction cost. - if (MONGO_likely(_killedOpCtx)) { - _killedOpCtx.reset(); - } - if (!status.isOK()) { - _state.store(State::EndSession); - // The service executor failed to schedule the task. This could for example be that - // we failed to start a worker thread. Terminate this connection to leave the system - // in a valid state. - LOGV2_WARNING_OPTIONS(4910400, - {logv2::LogComponent::kExecutor}, - "Terminating session due to error: {error}", - "Terminating session due to error", - "error"_attr = status); - terminate(); - - ThreadGuard terminateGuard(this); - _cleanupSession(std::move(terminateGuard)); - return; - } + return; + } catch (const DBException& e) { + LOGV2(22990, + "DBException handling request, closing client connection: {error}", + "DBException handling request, closing client connection", + "error"_attr = redact(e)); + } + // No need to catch std::exception, as std::terminate will be called when the exception bubbles + // to the top of the stack - _executor()->schedule(GuaranteedExecutor::enforceRunOnce( - [this, anchor = shared_from_this()](Status status) { _runOnce(); })); - }); + if (!guard) { + guard = ThreadGuard(this); + } + _state.store(State::EndSession); + _cleanupSession(std::move(guard)); +} + +void ServiceStateMachine::start(Ownership ownershipModel) { + _scheduleNextWithGuard( + ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel); +} + +void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) { + _serviceExecutor = executor; +} + +void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, + transport::ServiceExecutor::ScheduleFlags flags, + Ownership ownershipModel) { + auto func = [ssm = shared_from_this(), ownershipModel] { + ThreadGuard guard(ssm.get()); + if (ownershipModel == Ownership::kStatic) + guard.markStaticOwnership(); + ssm->_runNextInGuard(std::move(guard)); + }; + guard.release(); + Status status = _serviceExecutor->scheduleTask(std::move(func), flags); + if (status.isOK()) { + return; + } + + // We've had an error, reacquire the ThreadGuard and destroy the SSM + ThreadGuard terminateGuard(this); + + // The service executor failed to schedule the task. This could for example be that we failed + // to start a worker thread. Terminate this connection to leave the system in a valid state. + _terminateAndLogIfError(status); + _cleanupSession(std::move(terminateGuard)); } void ServiceStateMachine::terminate() { @@ -659,12 +697,9 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { _inMessage.reset(); - _outMessage.reset(); - // By ignoring the return value of Client::releaseCurrent() we destroy the session. // _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed. Client::releaseCurrent(); } -} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 7794cdfbfce..0572c4f6ac8 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -35,7 +35,6 @@ #include "mongo/base/status.h" #include "mongo/config.h" -#include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" @@ -45,11 +44,9 @@ #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" -#include "mongo/util/future.h" #include "mongo/util/net/ssl_manager.h" namespace mongo { -namespace transport { /* * The ServiceStateMachine holds the state of a single client connection and represents the @@ -66,9 +63,17 @@ public: ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; /* - * Construct a ServiceStateMachine for a given Client. + * Creates a new ServiceStateMachine for a given session/service context. If sync is true, + * then calls into the transport layer will block while they complete, otherwise they will + * be handled asynchronously. */ - ServiceStateMachine(ServiceContext::UniqueClient client); + static std::shared_ptr create(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); + + ServiceStateMachine(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); /* * Any state may transition to EndSession in case of an error, otherwise the valid state @@ -101,9 +106,31 @@ public: enum class Ownership { kUnowned, kOwned, kStatic }; /* - * start() schedules a call to _runOnce() in the future. + * runNext() will run the current state of the state machine. It also handles all the error + * handling and state management for requests. + * + * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack + * if they have just completed a database operation to make sure that this doesn't infinitely + * recurse. + * + * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take + * ownership of the SSM, it will call scheduleNext() and return immediately. + */ + void runNext(); + + /* + * start() schedules a call to runNext() in the future. + * + * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not + * guaranteed that runNext() will run after this return + */ + void start(Ownership ownershipModel); + + /* + * Set the executor to be used for the next call to runNext(). This allows switching between + * thread models after the SSM has started. */ - void start(); + void setServiceExecutor(transport::ServiceExecutor* executor); /* * Gets the current state of connection for testing/diagnostic purposes. @@ -133,8 +160,7 @@ public: private: /* - * A class that wraps up lifetime management of the _dbClient and _threadName for - * each step in _runOnce(); + * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); */ class ThreadGuard; friend class ThreadGuard; @@ -162,15 +188,18 @@ private: const transport::SessionHandle& _session() const; /* - * Gets the transport::ServiceExecutor associated with this connection. + * This is the actual implementation of runNext() that gets called after the ThreadGuard + * has been successfully created. If any callbacks (like sourceCallback()) need to call + * runNext() and already own a ThreadGuard, they should call this with that guard as the + * argument. */ - ServiceExecutor* _executor(); + void _runNextInGuard(ThreadGuard guard); /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ - Future _processMessage(ThreadGuard guard); + inline void _processMessage(ThreadGuard guard); /* * These get called by the TransportLayer when requested network I/O has completed. @@ -182,20 +211,14 @@ private: * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just * before waiting on the TL. */ - Future _sourceMessage(ThreadGuard guard); - Future _sinkMessage(ThreadGuard guard); + void _sourceMessage(ThreadGuard guard); + void _sinkMessage(ThreadGuard guard, Message toSink); /* * Releases all the resources associated with the session and call the cleanupHook. */ void _cleanupSession(ThreadGuard guard); - /* - * This is the initial function called at the beginning of a thread's lifecycle in the - * TransportLayer. - */ - void _runOnce(); - /* * Releases all the resources associated with the exhaust request. */ @@ -203,18 +226,21 @@ private: AtomicWord _state{State::Created}; + ServiceEntryPoint* _sep; + transport::Mode _transportMode; + ServiceContext* const _serviceContext; - ServiceEntryPoint* const _sep; + transport::ServiceExecutor* _serviceExecutor; transport::SessionHandle _sessionHandle; + const std::string _threadName; ServiceContext::UniqueClient _dbClient; - Client* _dbClientPtr; + const Client* _dbClientPtr; std::function _cleanupHook; bool _inExhaust = false; boost::optional _compressorId; Message _inMessage; - Message _outMessage; // Allows delegating destruction of opCtx to another function to potentially remove its cost // from the critical path. This is currently only used in `_processMessage()`. @@ -224,6 +250,7 @@ private: #if MONGO_CONFIG_DEBUG_BUILD AtomicWord _owningThread; #endif + std::string _oldThreadName; }; template @@ -254,5 +281,4 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) { return stream; } -} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 8b89b64516e..e0ff202bf6c 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -136,6 +136,8 @@ std::unique_ptr TransportLayerManager::createWithConfig( transport::TransportLayerASIO::Options opts(config); opts.transportMode = transport::Mode::kSynchronous; + ctx->setServiceExecutor(std::make_unique(ctx)); + std::vector> retVector; retVector.emplace_back(std::make_unique(opts, sep)); return std::make_unique(std::move(retVector)); -- cgit v1.2.1