diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-09-11 20:09:59 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-14 04:57:01 +0000 |
commit | 827a25eb01bc5ddf766b3a543ef0ba5112953e1b (patch) | |
tree | 504935db5b1292818dcd9b862bf8b5c3b4352fb5 /src/mongo/transport | |
parent | bd320bc2d10cff75756a2c95986cc81ec8a5e7c7 (diff) | |
download | mongo-827a25eb01bc5ddf766b3a543ef0ba5112953e1b.tar.gz |
SERVER-50867 Roll back ServiceStateMachine changes temporarily
This reverts these commits:
b039b24746e1d1fb10a32e1ca4831423c01d4cd7: SERVER-48980
97e16187ff3065d242a61a52e7b6edd4d439fb30: SERVER-49072
0607a6c291bf4cf4580a4444d826ed3c3ac3df47: SERVER-49104
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 163 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.h | 12 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.cpp | 135 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 77 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 16 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.cpp | 25 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 15 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 3 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 251 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 74 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 2 |
15 files changed, 259 insertions, 533 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 7932abc662d..5326afe5db2 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -86,7 +86,6 @@ env.Library( tlEnv.Library( target='service_executor', source=[ - 'service_executor.cpp', 'service_executor_fixed.cpp', 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', @@ -181,8 +180,7 @@ tlEnv.CppUnitTest( 'transport_layer_asio_test.cpp', 'service_executor_test.cpp', 'max_conns_override_test.cpp', - # TODO: service_state_machine test to be re-written in SERVER-50141 - # 'service_state_machine_test.cpp', + 'service_state_machine_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index fb1ad895d78..aa970f1ef67 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,8 +29,6 @@ #pragma once -#include <limits> - #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" @@ -83,13 +81,6 @@ public: virtual size_t numOpenSessions() const = 0; /** - * Returns the maximum number of sessions that can be open. - */ - virtual size_t maxOpenSessions() const { - return std::numeric_limits<size_t>::max(); - } - - /** * Processes a request and fills out a DbResponse. */ virtual Future<DbResponse> handleRequest(OperationContext* opCtx, diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 0090f30a686..1824ccf1c94 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -33,7 +33,6 @@ #include "mongo/transport/service_entry_point_impl.h" -#include <fmt/format.h> #include <vector> #include "mongo/db/auth/restriction_environment.h" @@ -49,23 +48,12 @@ #include <sys/resource.h> #endif -#if !defined(__has_feature) -#define __has_feature(x) 0 -#endif - namespace mongo { -using namespace fmt::literals; - bool shouldOverrideMaxConns(const transport::SessionHandle& session, const std::vector<stdx::variant<CIDR, std::string>>& exemptions) { - if (exemptions.empty()) { - return false; - } - const auto& remoteAddr = session->remoteAddr(); const auto& localAddr = session->localAddr(); - boost::optional<CIDR> remoteCIDR; if (remoteAddr.isValid() && remoteAddr.isIP()) { @@ -93,7 +81,8 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session, return false; } -size_t getSupportedMax() { +ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { + const auto supportedMax = [] { #ifdef _WIN32 return serverGlobalParams.maxConns; @@ -124,33 +113,21 @@ size_t getSupportedMax() { "limit"_attr = supportedMax); } - return supportedMax; -} - -ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) - : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {} + _maxNumConnections = supportedMax; -Status ServiceEntryPointImpl::start() { - if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); - !status.isOK()) { - return status; + if (serverGlobalParams.reservedAdminThreads) { + _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>( + _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); } +} - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->start(); !status.isOK()) { - return status; - } - } - - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { - // return status; - // } - - return Status::OK(); +Status ServiceEntryPointImpl::start() { + if (_adminInternalPool) + return _adminInternalPool->start(); + else + return Status::OK(); } -// TODO: explicitly start on the fixed executor void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs const auto& remoteAddr = session->remoteAddr(); @@ -159,48 +136,43 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr); RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); - bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - - auto clientName = "conn{}"_format(session->id()); - auto client = _svcCtx->makeClient(clientName, session); - - { - stdx::lock_guard lk(*client); - auto seCtx = - transport::ServiceExecutorContext{} - .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated) - .setCanUseReserved(canOverrideMaxConns); - - transport::ServiceExecutorContext::set(client.get(), std::move(seCtx)); - } - - auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client)); + SSMListIterator ssmIt; const bool quiet = serverGlobalParams.quiet.load(); - size_t connectionCount; - auto ssmIt = [&]() -> boost::optional<SSMListIterator> { - stdx::lock_guard lk(_sessionsMutex); - connectionCount = _currentConnections.load(); - if (connectionCount > _maxNumConnections && !canOverrideMaxConns) { - return boost::none; + auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); + + auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); + auto usingMaxConnOverride = false; + { + stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); + connectionCount = _sessions.size() + 1; + if (connectionCount > _maxNumConnections) { + usingMaxConnOverride = + shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); } - auto it = _sessions.emplace(_sessions.begin(), ssm); - connectionCount = _sessions.size(); - _currentConnections.store(connectionCount); - _createdConnections.addAndFetch(1); - return it; - }(); + if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { + ssmIt = _sessions.emplace(_sessions.begin(), ssm); + _currentConnections.store(connectionCount); + _createdConnections.addAndFetch(1); + } + } - if (!ssmIt) { + // Checking if we successfully added a connection above. Separated from the lock so we don't log + // while holding it. + if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", "connectionCount"_attr = connectionCount); } return; - } else if (!quiet) { + } else if (usingMaxConnOverride && _adminInternalPool) { + ssm->setServiceExecutor(_adminInternalPool.get()); + } + + if (!quiet) { LOGV2(22943, "Connection accepted", "remote"_attr = session->remote(), @@ -213,7 +185,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto remote = session->remote(); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(*ssmIt); + _sessions.erase(ssmIt); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); } @@ -228,7 +200,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { } }); - ssm->start(); + auto ownership = ServiceStateMachine::Ownership::kOwned; + if (transportMode == transport::Mode::kSynchronous) { + ownership = ServiceStateMachine::Ownership::kStatic; + } + ssm->start(ownership); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { @@ -243,13 +219,8 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { } bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { -#if __has_feature(address_sanitizer) - // When running under address sanitizer, we get false positive leaks due to disorder around - // the lifecycle of a connection and request. When we are running under ASAN, we try a lot - // harder to dry up the server from active connections before going on to really shut down. using logv2::LogComponent; - auto start = _svcCtx->getPreciseClockSource()->now(); stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); // Request that all sessions end, while holding the _sesionsMutex, loop over all the current @@ -286,37 +257,7 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { "shutdown: exhausted grace period active workers to drain; continuing with shutdown...", "workers"_attr = numOpenSessions()); } - - lk.unlock(); - - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - // timeout = std::max(Milliseconds{0}, timeout - timeSpent); - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); - // !status.isOK()) { - // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); - // } - - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->shutdown(timeout); !status.isOK()) { - LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); - } - } - - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto status = - transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent); - !status.isOK()) { - LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); - } - return result; -#else - return true; -#endif } void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { @@ -326,17 +267,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { bob->append("current", static_cast<int>(sessionCount)); bob->append("available", static_cast<int>(_maxNumConnections - sessionCount)); bob->append("totalCreated", static_cast<int>(_createdConnections.load())); + if (auto sc = getGlobalServiceContext()) { + bob->append("active", static_cast<int>(sc->getActiveClientOperations())); + bob->append("exhaustIsMaster", + static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster())); + bob->append("awaitingTopologyChanges", + static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges())); + } - invariant(_svcCtx); - bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations())); - bob->append("exhaustIsMaster", - static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster())); - bob->append("awaitingTopologyChanges", - static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges())); - - if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (_adminInternalPool) { BSONObjBuilder section(bob->subobjStart("adminConnections")); - adminExec->appendStats(§ion); + _adminInternalPool->appendStats(§ion); } } diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 1693469fcda..38c1319657a 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -36,9 +36,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/variant.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_reserved.h" -#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/service_state_machine.h" #include "mongo/util/hierarchical_acquisition.h" #include "mongo/util/net/cidr.h" @@ -77,12 +75,8 @@ public: return _currentConnections.load(); } - size_t maxOpenSessions() const final { - return _maxNumConnections; - } - private: - using SSMList = std::list<std::shared_ptr<transport::ServiceStateMachine>>; + using SSMList = std::list<std::shared_ptr<ServiceStateMachine>>; using SSMListIterator = SSMList::iterator; ServiceContext* const _svcCtx; @@ -93,9 +87,11 @@ private: stdx::condition_variable _shutdownCondition; SSMList _sessions; - const size_t _maxNumConnections{DEFAULT_MAX_CONN}; + size_t _maxNumConnections{DEFAULT_MAX_CONN}; AtomicWord<size_t> _currentConnections{0}; AtomicWord<size_t> _createdConnections{0}; + + std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool; }; /* diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp deleted file mode 100644 index 37c3d03e1b1..00000000000 --- a/src/mongo/transport/service_executor.cpp +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor.h" - -#include <boost/optional.hpp> - -#include "mongo/logv2/log.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_fixed.h" -#include "mongo/transport/service_executor_reserved.h" -#include "mongo/transport/service_executor_synchronous.h" - -namespace mongo { -namespace transport { -namespace { -static constexpr auto kDiagnosticLogLevel = 4; - -auto getServiceExecutorContext = - Client::declareDecoration<boost::optional<ServiceExecutorContext>>(); -} // namespace - -StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) { - switch (threadingModel) { - case ServiceExecutorContext::ThreadingModel::kDedicated: - return "Dedicated"_sd; - case ServiceExecutorContext::ThreadingModel::kBorrowed: - return "Borrowed"_sd; - default: - MONGO_UNREACHABLE; - } -} - -ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept { - auto& serviceExecutorContext = getServiceExecutorContext(client); - - if (!serviceExecutorContext) { - // Service worker Clients will never have a ServiceExecutorContext. - return nullptr; - } - - return &serviceExecutorContext.get(); -} - -void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) noexcept { - auto& serviceExecutorContext = getServiceExecutorContext(client); - invariant(!serviceExecutorContext); - - seCtx._client = client; - seCtx._sep = client->getServiceContext()->getServiceEntryPoint(); - - LOGV2_DEBUG(4898000, - kDiagnosticLogLevel, - "Setting initial ServiceExecutor context for client", - "client"_attr = client->desc(), - "threadingModel"_attr = seCtx._threadingModel, - "canUseReserved"_attr = seCtx._canUseReserved); - serviceExecutorContext = std::move(seCtx); -} - -ServiceExecutorContext& ServiceExecutorContext::setThreadingModel( - ThreadingModel threadingModel) noexcept { - _threadingModel = threadingModel; - return *this; -} - -ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept { - _canUseReserved = canUseReserved; - return *this; -} - -ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept { - invariant(_client); - - switch (_threadingModel) { - case ThreadingModel::kBorrowed: - return ServiceExecutorFixed::get(_client->getServiceContext()); - case ThreadingModel::kDedicated: { - // Continue on. - } break; - default: - MONGO_UNREACHABLE; - } - - auto shouldUseReserved = [&] { - // This is at best a naive solution. There could be a world where numOpenSessions() changes - // very quickly. We are not taking locks on the ServiceEntryPoint, so we may chose to - // schedule onto the ServiceExecutorReserved when it is no longer necessary. The upside is - // that we will automatically shift to the ServiceExecutorSynchronous after the first - // command loop. - return _sep->numOpenSessions() > _sep->maxOpenSessions(); - }; - - if (_canUseReserved && shouldUseReserved()) { - if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) { - // We are allowed to use the reserved executor, we should use it, and it exists. - return exec; - } - } - - return transport::ServiceExecutorSynchronous::get(_client->getServiceContext()); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 38116908272..b702198e6b5 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -33,19 +33,20 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/client.h" -#include "mongo/db/service_context.h" #include "mongo/platform/bitwise_enum_operators.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" #include "mongo/util/duration.h" #include "mongo/util/functional.h" #include "mongo/util/out_of_line_executor.h" namespace mongo { +// This needs to be forward declared here because the service_context.h is a circular dependency. +class ServiceContext; + namespace transport { +class Session; + /* * This is the interface for all ServiceExecutors. */ @@ -122,74 +123,6 @@ public: virtual void appendStats(BSONObjBuilder* bob) const = 0; }; -/** - * ServiceExecutorContext determines which ServiceExecutor is used for each Client. - */ -class ServiceExecutorContext { -public: - enum ThreadingModel { - kBorrowed, - kDedicated, - }; - - /** - * Get a pointer to the ServiceExecutorContext for a given client. - * - * This function is valid to invoke either on the Client thread or with the Client lock. - */ - static ServiceExecutorContext* get(Client* client) noexcept; - - /** - * Set the ServiceExecutorContext for a given client. - * - * This function may only be invoked once and only while under the Client lock. - */ - static void set(Client* client, ServiceExecutorContext seCtx) noexcept; - - ServiceExecutorContext() = default; - - /** - * Set the ThreadingModel for the associated Client's service execution. - * - * This function is only valid to invoke with the Client lock or before the Client is set. - */ - ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept; - - /** - * Set if reserved resources are available for the associated Client's service execution. - * - * This function is only valid to invoke with the Client lock or before the Client is set. - */ - ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept; - - /** - * Get the ThreadingModel for the associated Client. - * - * This function is valid to invoke either on the Client thread or with the Client lock. - */ - auto getThreadingModel() const noexcept { - return _threadingModel; - } - - /** - * Get an appropriate ServiceExecutor given the current parameters. - * - * This function is only valid to invoke from the associated Client thread. This function does - * not require the Client lock since all writes must also happen from that thread. - */ - ServiceExecutor* getServiceExecutor() const noexcept; - -private: - friend StringData toString(ThreadingModel threadingModel); - - Client* _client = nullptr; - ServiceEntryPoint* _sep = nullptr; - - ThreadingModel _threadingModel = ThreadingModel::kDedicated; - bool _canUseReserved = false; -}; - - } // namespace transport ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags) diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index b068e487126..2a1fc737cc1 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -37,7 +37,6 @@ #include "mongo/transport/session.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" -#include "mongo/util/thread_safety_context.h" namespace mongo { @@ -50,15 +49,6 @@ namespace { constexpr auto kThreadsRunning = "threadsRunning"_sd; constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "fixed"_sd; - -const auto getServiceExecutorFixed = - ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>(); - -const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorFixed", [](ServiceContext* ctx) { - getServiceExecutorFixed(ctx) = - std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{}); - }}; } // namespace ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options) @@ -96,12 +86,6 @@ Status ServiceExecutorFixed::start() { return Status::OK(); } -ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorFixed(ctx); - invariant(ref); - return ref.get(); -} - Status ServiceExecutorFixed::shutdown(Milliseconds timeout) { auto waitForShutdown = [&]() mutable -> Status { stdx::unique_lock<Latch> lk(_mutex); diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h index 6c540576a78..82be057e1f5 100644 --- a/src/mongo/transport/service_executor_fixed.h +++ b/src/mongo/transport/service_executor_fixed.h @@ -33,7 +33,6 @@ #include <memory> #include "mongo/base/status.h" -#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -56,8 +55,6 @@ public: explicit ServiceExecutorFixed(ThreadPool::Options options); virtual ~ServiceExecutorFixed(); - static ServiceExecutorFixed* get(ServiceContext* ctx); - Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp index d81ad5be200..cae9653e60f 100644 --- a/src/mongo/transport/service_executor_reserved.cpp +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -33,13 +33,11 @@ #include "mongo/transport/service_executor_reserved.h" -#include "mongo/db/server_options.h" #include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" -#include "mongo/util/thread_safety_context.h" namespace mongo { namespace transport { @@ -50,19 +48,6 @@ constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "reserved"_sd; constexpr auto kReadyThreads = "readyThreads"_sd; constexpr auto kStartingThreads = "startingThreads"_sd; - -const auto getServiceExecutorReserved = - ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>(); - -const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorReserved", [](ServiceContext* ctx) { - if (!serverGlobalParams.reservedAdminThreads) { - return; - } - - getServiceExecutorReserved(ctx) = std::make_unique<transport::ServiceExecutorReserved>( - ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); - }}; } // namespace thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {}; @@ -162,12 +147,6 @@ Status ServiceExecutorReserved::_startWorker() { }); } -ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorReserved(ctx); - - // The ServiceExecutorReserved could be absent, so nullptr is okay. - return ref.get(); -} Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { LOGV2_DEBUG(22980, 3, "Shutting down reserved executor"); @@ -194,8 +173,8 @@ Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) { if (!_localWorkQueue.empty()) { // Execute task directly (recurse) if allowed by the caller as it produced better // performance in testing. Try to limit the amount of recursion so we don't blow up the - // stack, even though this shouldn't happen with this executor that uses blocking - // network I/O. + // stack, even though this shouldn't happen with this executor that uses blocking network + // I/O. if ((flags & ScheduleFlags::kMayRecurse) && (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) { ++_localRecursionDepth; diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h index 60de4bc2993..e3acf6febd9 100644 --- a/src/mongo/transport/service_executor_reserved.h +++ b/src/mongo/transport/service_executor_reserved.h @@ -32,7 +32,6 @@ #include <deque> #include "mongo/base/status.h" -#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -55,8 +54,6 @@ class ServiceExecutorReserved final : public ServiceExecutor { public: explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads); - static ServiceExecutorReserved* get(ServiceContext* ctx); - Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index c75bdbb0952..d034e208954 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -38,7 +38,6 @@ #include "mongo/transport/service_executor_gen.h" #include "mongo/transport/service_executor_utils.h" #include "mongo/util/processinfo.h" -#include "mongo/util/thread_safety_context.h" namespace mongo { namespace transport { @@ -46,14 +45,6 @@ namespace { constexpr auto kThreadsRunning = "threadsRunning"_sd; constexpr auto kExecutorLabel = "executor"_sd; constexpr auto kExecutorName = "passthrough"_sd; - -const auto getServiceExecutorSynchronous = - ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>(); - -const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{ - "ServiceExecutorSynchronous", [](ServiceContext* ctx) { - getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx); - }}; } // namespace thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {}; @@ -87,12 +78,6 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) { "passthrough executor couldn't shutdown all worker threads within time limit."); } -ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) { - auto& ref = getServiceExecutorSynchronous(ctx); - invariant(ref); - return ref.get(); -} - Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) { if (!_stillRunning.load()) { return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h index 840dc702d4c..940382b53e8 100644 --- a/src/mongo/transport/service_executor_synchronous.h +++ b/src/mongo/transport/service_executor_synchronous.h @@ -32,7 +32,6 @@ #include <deque> #include "mongo/base/status.h" -#include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -50,8 +49,6 @@ class ServiceExecutorSynchronous final : public ServiceExecutor { public: explicit ServiceExecutorSynchronous(ServiceContext* ctx); - static ServiceExecutorSynchronous* get(ServiceContext* ctx); - Status start() override; Status shutdown(Milliseconds timeout) override; Status scheduleTask(Task task, ScheduleFlags flags) override; diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5124542291b..373a362d0df 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -36,6 +36,7 @@ #include <memory> #include "mongo/config.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" #include "mongo/db/traffic_recorder.h" @@ -44,8 +45,6 @@ #include "mongo/rpc/op_msg.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_fixed.h" -#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" @@ -59,7 +58,6 @@ #include "mongo/util/quick_exit.h" namespace mongo { -namespace transport { namespace { MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome); /** @@ -161,14 +159,14 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { return exhaustMessage; } + } // namespace using transport::ServiceExecutor; using transport::TransportLayer; /* - * This class wraps up the logic for swapping/unswapping the Client when transitioning - * between states. + * This class wraps up the logic for swapping/unswapping the Client during runNext(). * * In debug builds this also ensures that only one thread is working on the SSM at once. */ @@ -194,10 +192,9 @@ public: // Set up the thread name auto oldThreadName = getThreadName(); - const auto& threadName = _ssm->_dbClient->desc(); - if (oldThreadName != threadName) { - _oldThreadName = oldThreadName.toString(); - setThreadName(threadName); + if (oldThreadName != _ssm->_threadName) { + _ssm->_oldThreadName = getThreadName().toString(); + setThreadName(_ssm->_threadName); } // Swap the current Client so calls to cc() work as expected @@ -259,8 +256,8 @@ public: _ssm->_dbClient = Client::releaseCurrent(); } - if (!_oldThreadName.empty()) { - setThreadName(_oldThreadName); + if (!_ssm->_oldThreadName.empty()) { + setThreadName(_ssm->_oldThreadName); } } @@ -288,75 +285,75 @@ public: private: ServiceStateMachine* _ssm; bool _haveTakenOwnership = false; - std::string _oldThreadName; }; -ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) +std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode) { + return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), transportMode); +} + +ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode) : _state{State::Created}, - _serviceContext{client->getServiceContext()}, - _sep{_serviceContext->getServiceEntryPoint()}, - _dbClient{std::move(client)}, + _sep{svcContext->getServiceEntryPoint()}, + _transportMode(transportMode), + _serviceContext(svcContext), + _serviceExecutor(_serviceContext->getServiceExecutor()), + _sessionHandle(session), + _threadName{str::stream() << "conn" << _session()->id()}, + _dbClient{svcContext->makeClient(_threadName, std::move(session))}, _dbClientPtr{_dbClient.get()} {} const transport::SessionHandle& ServiceStateMachine::_session() const { - return _dbClientPtr->session(); + return _sessionHandle; } -ServiceExecutor* ServiceStateMachine::_executor() { - return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor(); -} - -Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) { +void ServiceStateMachine::_sourceMessage(ThreadGuard guard) { invariant(_inMessage.empty()); invariant(_state.load() == State::Source); _state.store(State::SourceWait); guard.release(); auto sourceMsgImpl = [&] { - const auto& transportMode = _executor()->transportMode(); - if (transportMode == transport::Mode::kSynchronous) { + if (_transportMode == transport::Mode::kSynchronous) { MONGO_IDLE_THREAD_BLOCK; return Future<Message>::makeReady(_session()->sourceMessage()); } else { - invariant(transportMode == transport::Mode::kAsynchronous); + invariant(_transportMode == transport::Mode::kAsynchronous); return _session()->asyncSourceMessage(); } }; - return sourceMsgImpl().onCompletion([this](StatusWith<Message> msg) -> Future<void> { + sourceMsgImpl().getAsync([this](StatusWith<Message> msg) { if (msg.isOK()) { _inMessage = std::move(msg.getValue()); invariant(!_inMessage.empty()); } _sourceCallback(msg.getStatus()); - return Status::OK(); }); } -Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) { +void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) { // Sink our response to the client invariant(_state.load() == State::Process); _state.store(State::SinkWait); guard.release(); - auto toSink = std::exchange(_outMessage, {}); auto sinkMsgImpl = [&] { - const auto& transportMode = _executor()->transportMode(); - if (transportMode == transport::Mode::kSynchronous) { + if (_transportMode == transport::Mode::kSynchronous) { // We don't consider ourselves idle while sending the reply since we are still doing // work on behalf of the client. Contrast that with sourceMessage() where we are waiting // for the client to send us more work to do. return Future<void>::makeReady(_session()->sinkMessage(std::move(toSink))); } else { - invariant(transportMode == transport::Mode::kAsynchronous); + invariant(_transportMode == transport::Mode::kAsynchronous); return _session()->asyncSinkMessage(std::move(toSink)); } }; - return sinkMsgImpl().onCompletion([this](Status status) { - _sinkCallback(std::move(status)); - return Status::OK(); - }); + sinkMsgImpl().getAsync([this](Status status) { _sinkCallback(std::move(status)); }); } void ServiceStateMachine::_sourceCallback(Status status) { @@ -371,11 +368,14 @@ void ServiceStateMachine::_sourceCallback(Status status) { if (status.isOK()) { _state.store(State::Process); - // If the sourceMessage succeeded then we can move to on to process the message. We - // simply return from here and the future chain in _runOnce() will continue to the - // next state normally. + // Since we know that we're going to process a message, call scheduleNext() immediately + // to schedule the call to processMessage() on the serviceExecutor (or just unwind the + // stack) - // If any other issues arise, close the session. + // If this callback doesn't own the ThreadGuard, then we're being called recursively, + // and the executor shouldn't start a new thread to process the message - it can use this + // one just after this returns. + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse); } else if (ErrorCodes::isInterruption(status.code()) || ErrorCodes::isNetworkError(status.code())) { LOGV2_DEBUG( @@ -401,7 +401,10 @@ void ServiceStateMachine::_sourceCallback(Status status) { "connectionId"_attr = _session()->id()); _state.store(State::EndSession); } - uassertStatusOK(status); + + // There was an error receiving a message from the client and we've already printed the error + // so call runNextInGuard() to clean up the session without waiting. + _runNextInGuard(std::move(guard)); } void ServiceStateMachine::_sinkCallback(Status status) { @@ -412,10 +415,10 @@ void ServiceStateMachine::_sinkCallback(Status status) { dassert(state() == State::SinkWait); // If there was an error sinking the message to the client, then we should print an error and - // end the session. + // end the session. No need to unwind the stack, so this will runNextInGuard() and return. // - // Otherwise, update the current state depending on whether we're in exhaust or not and return - // from this function to let _runOnce continue the future chaining of state transitions. + // Otherwise, update the current state depending on whether we're in exhaust or not, and call + // scheduleNext() to unwind the stack and do the next step. if (!status.isOK()) { LOGV2(22989, "Error sending response to client. Ending connection from remote", @@ -423,19 +426,25 @@ void ServiceStateMachine::_sinkCallback(Status status) { "remote"_attr = _session()->remote(), "connectionId"_attr = _session()->id()); _state.store(State::EndSession); - uassertStatusOK(status); + return _runNextInGuard(std::move(guard)); } else if (_inExhaust) { _state.store(State::Process); + return _scheduleNextWithGuard(std::move(guard), + ServiceExecutor::kDeferredTask | + ServiceExecutor::kMayYieldBeforeSchedule); } else { _state.store(State::Source); + return _scheduleNextWithGuard(std::move(guard), + ServiceExecutor::kDeferredTask | + ServiceExecutor::kMayYieldBeforeSchedule); } } -Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { +void ServiceStateMachine::_processMessage(ThreadGuard guard) { invariant(!_inMessage.empty()); TrafficRecorder::get(_serviceContext) - .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage); + .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage); auto& compressorMgr = MessageCompressorManager::forSession(_session()); @@ -458,7 +467,7 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. - return _sep->handleRequest(opCtx.get(), _inMessage) + _sep->handleRequest(opCtx.get(), _inMessage) .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx), @@ -507,77 +516,106 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { } TrafficRecorder::get(_serviceContext) - .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink); + .observe( + _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + + _sinkMessage(std::move(guard), std::move(toSink)); - _outMessage = std::move(toSink); } else { _state.store(State::Source); _inMessage.reset(); - _outMessage.reset(); _inExhaust = false; + return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask); } - }); + }) + .get(); } -void ServiceStateMachine::start() { - _executor()->schedule( - GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { - // TODO(SERVER-49109) We can't use static ownership in general with - // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter. - ThreadGuard guard(shared_from_this().get()); - guard.markStaticOwnership(); +void ServiceStateMachine::runNext() { + return _runNextInGuard(ThreadGuard(this)); +} - // If this is the first run of the SSM, then update its state to Source - if (state() == State::Created) { - _state.store(State::Source); - } +void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) { + auto curState = state(); + dassert(curState != State::Ended); - _runOnce(); - })); -} + // If this is the first run of the SSM, then update its state to Source + if (curState == State::Created) { + curState = State::Source; + _state.store(curState); + } -void ServiceStateMachine::_runOnce() { - makeReadyFutureWith([&]() -> Future<void> { - if (_inExhaust) { - return Status::OK(); - } else { - return _sourceMessage(ThreadGuard(this)); + // Destroy the opCtx (already killed) here, to potentially use the delay between clients' + // requests to hide the destruction cost. + if (MONGO_likely(_killedOpCtx)) { + _killedOpCtx.reset(); + } + + // Make sure the current Client got set correctly + dassert(Client::getCurrent() == _dbClientPtr); + try { + switch (curState) { + case State::Source: + _sourceMessage(std::move(guard)); + break; + case State::Process: + _processMessage(std::move(guard)); + break; + case State::EndSession: + _cleanupSession(std::move(guard)); + break; + default: + MONGO_UNREACHABLE; } - }) - .then([this]() { return _processMessage(ThreadGuard(this)); }) - .then([this]() -> Future<void> { - if (_outMessage.empty()) { - return Status::OK(); - } - return _sinkMessage(ThreadGuard(this)); - }) - .getAsync([this, anchor = shared_from_this()](Status status) { - // Destroy the opCtx (already killed) here, to potentially use the delay between - // clients' requests to hide the destruction cost. - if (MONGO_likely(_killedOpCtx)) { - _killedOpCtx.reset(); - } - if (!status.isOK()) { - _state.store(State::EndSession); - // The service executor failed to schedule the task. This could for example be that - // we failed to start a worker thread. Terminate this connection to leave the system - // in a valid state. - LOGV2_WARNING_OPTIONS(4910400, - {logv2::LogComponent::kExecutor}, - "Terminating session due to error: {error}", - "Terminating session due to error", - "error"_attr = status); - terminate(); - - ThreadGuard terminateGuard(this); - _cleanupSession(std::move(terminateGuard)); - return; - } + return; + } catch (const DBException& e) { + LOGV2(22990, + "DBException handling request, closing client connection: {error}", + "DBException handling request, closing client connection", + "error"_attr = redact(e)); + } + // No need to catch std::exception, as std::terminate will be called when the exception bubbles + // to the top of the stack - _executor()->schedule(GuaranteedExecutor::enforceRunOnce( - [this, anchor = shared_from_this()](Status status) { _runOnce(); })); - }); + if (!guard) { + guard = ThreadGuard(this); + } + _state.store(State::EndSession); + _cleanupSession(std::move(guard)); +} + +void ServiceStateMachine::start(Ownership ownershipModel) { + _scheduleNextWithGuard( + ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel); +} + +void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) { + _serviceExecutor = executor; +} + +void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, + transport::ServiceExecutor::ScheduleFlags flags, + Ownership ownershipModel) { + auto func = [ssm = shared_from_this(), ownershipModel] { + ThreadGuard guard(ssm.get()); + if (ownershipModel == Ownership::kStatic) + guard.markStaticOwnership(); + ssm->_runNextInGuard(std::move(guard)); + }; + guard.release(); + Status status = _serviceExecutor->scheduleTask(std::move(func), flags); + if (status.isOK()) { + return; + } + + // We've had an error, reacquire the ThreadGuard and destroy the SSM + ThreadGuard terminateGuard(this); + + // The service executor failed to schedule the task. This could for example be that we failed + // to start a worker thread. Terminate this connection to leave the system in a valid state. + _terminateAndLogIfError(status); + _cleanupSession(std::move(terminateGuard)); } void ServiceStateMachine::terminate() { @@ -659,12 +697,9 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { _inMessage.reset(); - _outMessage.reset(); - // By ignoring the return value of Client::releaseCurrent() we destroy the session. // _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed. Client::releaseCurrent(); } -} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 7794cdfbfce..0572c4f6ac8 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -35,7 +35,6 @@ #include "mongo/base/status.h" #include "mongo/config.h" -#include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" @@ -45,11 +44,9 @@ #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" -#include "mongo/util/future.h" #include "mongo/util/net/ssl_manager.h" namespace mongo { -namespace transport { /* * The ServiceStateMachine holds the state of a single client connection and represents the @@ -66,9 +63,17 @@ public: ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; /* - * Construct a ServiceStateMachine for a given Client. + * Creates a new ServiceStateMachine for a given session/service context. If sync is true, + * then calls into the transport layer will block while they complete, otherwise they will + * be handled asynchronously. */ - ServiceStateMachine(ServiceContext::UniqueClient client); + static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); + + ServiceStateMachine(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); /* * Any state may transition to EndSession in case of an error, otherwise the valid state @@ -101,9 +106,31 @@ public: enum class Ownership { kUnowned, kOwned, kStatic }; /* - * start() schedules a call to _runOnce() in the future. + * runNext() will run the current state of the state machine. It also handles all the error + * handling and state management for requests. + * + * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack + * if they have just completed a database operation to make sure that this doesn't infinitely + * recurse. + * + * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take + * ownership of the SSM, it will call scheduleNext() and return immediately. + */ + void runNext(); + + /* + * start() schedules a call to runNext() in the future. + * + * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not + * guaranteed that runNext() will run after this return + */ + void start(Ownership ownershipModel); + + /* + * Set the executor to be used for the next call to runNext(). This allows switching between + * thread models after the SSM has started. */ - void start(); + void setServiceExecutor(transport::ServiceExecutor* executor); /* * Gets the current state of connection for testing/diagnostic purposes. @@ -133,8 +160,7 @@ public: private: /* - * A class that wraps up lifetime management of the _dbClient and _threadName for - * each step in _runOnce(); + * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); */ class ThreadGuard; friend class ThreadGuard; @@ -162,15 +188,18 @@ private: const transport::SessionHandle& _session() const; /* - * Gets the transport::ServiceExecutor associated with this connection. + * This is the actual implementation of runNext() that gets called after the ThreadGuard + * has been successfully created. If any callbacks (like sourceCallback()) need to call + * runNext() and already own a ThreadGuard, they should call this with that guard as the + * argument. */ - ServiceExecutor* _executor(); + void _runNextInGuard(ThreadGuard guard); /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ - Future<void> _processMessage(ThreadGuard guard); + inline void _processMessage(ThreadGuard guard); /* * These get called by the TransportLayer when requested network I/O has completed. @@ -182,8 +211,8 @@ private: * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just * before waiting on the TL. */ - Future<void> _sourceMessage(ThreadGuard guard); - Future<void> _sinkMessage(ThreadGuard guard); + void _sourceMessage(ThreadGuard guard); + void _sinkMessage(ThreadGuard guard, Message toSink); /* * Releases all the resources associated with the session and call the cleanupHook. @@ -191,30 +220,27 @@ private: void _cleanupSession(ThreadGuard guard); /* - * This is the initial function called at the beginning of a thread's lifecycle in the - * TransportLayer. - */ - void _runOnce(); - - /* * Releases all the resources associated with the exhaust request. */ void _cleanupExhaustResources() noexcept; AtomicWord<State> _state{State::Created}; + ServiceEntryPoint* _sep; + transport::Mode _transportMode; + ServiceContext* const _serviceContext; - ServiceEntryPoint* const _sep; + transport::ServiceExecutor* _serviceExecutor; transport::SessionHandle _sessionHandle; + const std::string _threadName; ServiceContext::UniqueClient _dbClient; - Client* _dbClientPtr; + const Client* _dbClientPtr; std::function<void()> _cleanupHook; bool _inExhaust = false; boost::optional<MessageCompressorId> _compressorId; Message _inMessage; - Message _outMessage; // Allows delegating destruction of opCtx to another function to potentially remove its cost // from the critical path. This is currently only used in `_processMessage()`. @@ -224,6 +250,7 @@ private: #if MONGO_CONFIG_DEBUG_BUILD AtomicWord<stdx::thread::id> _owningThread; #endif + std::string _oldThreadName; }; template <typename T> @@ -254,5 +281,4 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) { return stream; } -} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 8b89b64516e..e0ff202bf6c 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -136,6 +136,8 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( transport::TransportLayerASIO::Options opts(config); opts.transportMode = transport::Mode::kSynchronous; + ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx)); + std::vector<std::unique_ptr<TransportLayer>> retVector; retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep)); return std::make_unique<TransportLayerManager>(std::move(retVector)); |