diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/curop.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/service_context.h | 6 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 80 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.h | 8 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.cpp | 135 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 77 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 77 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 37 |
11 files changed, 324 insertions, 114 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 6374268664b..85907307fe7 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -141,6 +141,7 @@ env.Library( '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/timer_stats', '$BUILD_DIR/mongo/rpc/client_metadata', + '$BUILD_DIR/mongo/transport/service_executor', '$BUILD_DIR/mongo/util/diagnostic_info' if get_option('use-diagnostic-latches') == 'on' else [], '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/net/network', diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp index 91aba3c28a7..6905e8ce469 100644 --- a/src/mongo/db/curop.cpp +++ b/src/mongo/db/curop.cpp @@ -53,6 +53,7 @@ #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/rpc/metadata/impersonated_user_metadata.h" +#include "mongo/transport/service_executor.h" #include "mongo/util/hex.h" #include "mongo/util/log_with_sampling.h" #include "mongo/util/net/socket_utils.h" @@ -303,6 +304,12 @@ void CurOp::reportCurrentOpForClient(OperationContext* opCtx, serializeAuthenticatedUsers("effectiveUsers"_sd); } + if (const auto seCtx = transport::ServiceExecutorContext::get(client)) { + bool isDedicated = (seCtx->getThreadingModel() == + transport::ServiceExecutorContext::ThreadingModel::kDedicated); + infoBuilder->append("threaded"_sd, isDedicated); + } + if (clientOpCtx) { infoBuilder->append("opid", static_cast<int>(clientOpCtx->getOpID())); diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 8d95c13b161..08e93df9cfb 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -42,7 +42,6 @@ #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/unordered_set.h" -#include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" #include "mongo/util/clock_source.h" #include "mongo/util/concurrency/with_lock.h" @@ -634,11 +633,6 @@ private: std::unique_ptr<ServiceEntryPoint> _serviceEntryPoint; /** - * The ServiceExecutor - */ - std::unique_ptr<transport::ServiceExecutor> _serviceExecutor; - - /** * The storage engine, if any. */ std::unique_ptr<StorageEngine> _storageEngine; diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 9ac3bf2c329..7932abc662d 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -86,6 +86,7 @@ env.Library( tlEnv.Library( target='service_executor', source=[ + 'service_executor.cpp', 'service_executor_fixed.cpp', 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index aa970f1ef67..fb1ad895d78 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,6 +29,8 @@ #pragma once +#include <limits> + #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" @@ -81,6 +83,13 @@ public: virtual size_t numOpenSessions() const = 0; /** + * Returns the maximum number of sessions that can be open. + */ + virtual size_t maxOpenSessions() const { + return std::numeric_limits<size_t>::max(); + } + + /** * Processes a request and fills out a DbResponse. */ virtual Future<DbResponse> handleRequest(OperationContext* opCtx, diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index ec35471ede6..0090f30a686 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/transport/service_entry_point_impl.h" +#include <fmt/format.h> #include <vector> #include "mongo/db/auth/restriction_environment.h" @@ -54,10 +55,17 @@ namespace mongo { +using namespace fmt::literals; + bool shouldOverrideMaxConns(const transport::SessionHandle& session, const std::vector<stdx::variant<CIDR, std::string>>& exemptions) { + if (exemptions.empty()) { + return false; + } + const auto& remoteAddr = session->remoteAddr(); const auto& localAddr = session->localAddr(); + boost::optional<CIDR> remoteCIDR; if (remoteAddr.isValid() && remoteAddr.isIP()) { @@ -85,8 +93,7 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session, return false; } -ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { - +size_t getSupportedMax() { const auto supportedMax = [] { #ifdef _WIN32 return serverGlobalParams.maxConns; @@ -117,9 +124,12 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s "limit"_attr = supportedMax); } - _maxNumConnections = supportedMax; + return supportedMax; } +ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) + : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {} + Status ServiceEntryPointImpl::start() { if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); !status.isOK()) { @@ -149,44 +159,48 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr); RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); - SSMListIterator ssmIt; + bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - const bool quiet = serverGlobalParams.quiet.load(); - size_t connectionCount; - auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode(); + auto clientName = "conn{}"_format(session->id()); + auto client = _svcCtx->makeClient(clientName, session); - auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); - auto usingMaxConnOverride = false; { - stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - connectionCount = _sessions.size() + 1; - if (connectionCount > _maxNumConnections) { - usingMaxConnOverride = - shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - } + stdx::lock_guard lk(*client); + auto seCtx = + transport::ServiceExecutorContext{} + .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated) + .setCanUseReserved(canOverrideMaxConns); - if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { - ssmIt = _sessions.emplace(_sessions.begin(), ssm); - _currentConnections.store(connectionCount); - _createdConnections.addAndFetch(1); - } + transport::ServiceExecutorContext::set(client.get(), std::move(seCtx)); } - // Checking if we successfully added a connection above. Separated from the lock so we don't log - // while holding it. - if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { + auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client)); + + const bool quiet = serverGlobalParams.quiet.load(); + + size_t connectionCount; + auto ssmIt = [&]() -> boost::optional<SSMListIterator> { + stdx::lock_guard lk(_sessionsMutex); + connectionCount = _currentConnections.load(); + if (connectionCount > _maxNumConnections && !canOverrideMaxConns) { + return boost::none; + } + + auto it = _sessions.emplace(_sessions.begin(), ssm); + connectionCount = _sessions.size(); + _currentConnections.store(connectionCount); + _createdConnections.addAndFetch(1); + return it; + }(); + + if (!ssmIt) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", "connectionCount"_attr = connectionCount); } return; - } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx); - usingMaxConnOverride && exec) { - ssm->setServiceExecutor(exec); - } - - if (!quiet) { + } else if (!quiet) { LOGV2(22943, "Connection accepted", "remote"_attr = session->remote(), @@ -199,7 +213,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto remote = session->remote(); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(ssmIt); + _sessions.erase(*ssmIt); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); } @@ -214,11 +228,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { } }); - auto ownership = ServiceStateMachine::Ownership::kOwned; - if (transportMode == transport::Mode::kSynchronous) { - ownership = ServiceStateMachine::Ownership::kStatic; - } - ssm->start(ownership); + ssm->start(); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 0528a19fbdb..1693469fcda 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -77,8 +77,12 @@ public: return _currentConnections.load(); } + size_t maxOpenSessions() const final { + return _maxNumConnections; + } + private: - using SSMList = std::list<std::shared_ptr<ServiceStateMachine>>; + using SSMList = std::list<std::shared_ptr<transport::ServiceStateMachine>>; using SSMListIterator = SSMList::iterator; ServiceContext* const _svcCtx; @@ -89,7 +93,7 @@ private: stdx::condition_variable _shutdownCondition; SSMList _sessions; - size_t _maxNumConnections{DEFAULT_MAX_CONN}; + const size_t _maxNumConnections{DEFAULT_MAX_CONN}; AtomicWord<size_t> _currentConnections{0}; AtomicWord<size_t> _createdConnections{0}; }; diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp new file mode 100644 index 00000000000..37c3d03e1b1 --- /dev/null +++ b/src/mongo/transport/service_executor.cpp @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_executor.h" + +#include <boost/optional.hpp> + +#include "mongo/logv2/log.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_fixed.h" +#include "mongo/transport/service_executor_reserved.h" +#include "mongo/transport/service_executor_synchronous.h" + +namespace mongo { +namespace transport { +namespace { +static constexpr auto kDiagnosticLogLevel = 4; + +auto getServiceExecutorContext = + Client::declareDecoration<boost::optional<ServiceExecutorContext>>(); +} // namespace + +StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) { + switch (threadingModel) { + case ServiceExecutorContext::ThreadingModel::kDedicated: + return "Dedicated"_sd; + case ServiceExecutorContext::ThreadingModel::kBorrowed: + return "Borrowed"_sd; + default: + MONGO_UNREACHABLE; + } +} + +ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept { + auto& serviceExecutorContext = getServiceExecutorContext(client); + + if (!serviceExecutorContext) { + // Service worker Clients will never have a ServiceExecutorContext. + return nullptr; + } + + return &serviceExecutorContext.get(); +} + +void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) noexcept { + auto& serviceExecutorContext = getServiceExecutorContext(client); + invariant(!serviceExecutorContext); + + seCtx._client = client; + seCtx._sep = client->getServiceContext()->getServiceEntryPoint(); + + LOGV2_DEBUG(4898000, + kDiagnosticLogLevel, + "Setting initial ServiceExecutor context for client", + "client"_attr = client->desc(), + "threadingModel"_attr = seCtx._threadingModel, + "canUseReserved"_attr = seCtx._canUseReserved); + serviceExecutorContext = std::move(seCtx); +} + +ServiceExecutorContext& ServiceExecutorContext::setThreadingModel( + ThreadingModel threadingModel) noexcept { + _threadingModel = threadingModel; + return *this; +} + +ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept { + _canUseReserved = canUseReserved; + return *this; +} + +ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept { + invariant(_client); + + switch (_threadingModel) { + case ThreadingModel::kBorrowed: + return ServiceExecutorFixed::get(_client->getServiceContext()); + case ThreadingModel::kDedicated: { + // Continue on. + } break; + default: + MONGO_UNREACHABLE; + } + + auto shouldUseReserved = [&] { + // This is at best a naive solution. There could be a world where numOpenSessions() changes + // very quickly. We are not taking locks on the ServiceEntryPoint, so we may chose to + // schedule onto the ServiceExecutorReserved when it is no longer necessary. The upside is + // that we will automatically shift to the ServiceExecutorSynchronous after the first + // command loop. + return _sep->numOpenSessions() > _sep->maxOpenSessions(); + }; + + if (_canUseReserved && shouldUseReserved()) { + if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) { + // We are allowed to use the reserved executor, we should use it, and it exists. + return exec; + } + } + + return transport::ServiceExecutorSynchronous::get(_client->getServiceContext()); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index b702198e6b5..38116908272 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -33,20 +33,19 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" +#include "mongo/db/service_context.h" #include "mongo/platform/bitwise_enum_operators.h" +#include "mongo/transport/service_entry_point.h" +#include "mongo/transport/session.h" #include "mongo/transport/transport_mode.h" #include "mongo/util/duration.h" #include "mongo/util/functional.h" #include "mongo/util/out_of_line_executor.h" namespace mongo { -// This needs to be forward declared here because the service_context.h is a circular dependency. -class ServiceContext; - namespace transport { -class Session; - /* * This is the interface for all ServiceExecutors. */ @@ -123,6 +122,74 @@ public: virtual void appendStats(BSONObjBuilder* bob) const = 0; }; +/** + * ServiceExecutorContext determines which ServiceExecutor is used for each Client. + */ +class ServiceExecutorContext { +public: + enum ThreadingModel { + kBorrowed, + kDedicated, + }; + + /** + * Get a pointer to the ServiceExecutorContext for a given client. + * + * This function is valid to invoke either on the Client thread or with the Client lock. + */ + static ServiceExecutorContext* get(Client* client) noexcept; + + /** + * Set the ServiceExecutorContext for a given client. + * + * This function may only be invoked once and only while under the Client lock. + */ + static void set(Client* client, ServiceExecutorContext seCtx) noexcept; + + ServiceExecutorContext() = default; + + /** + * Set the ThreadingModel for the associated Client's service execution. + * + * This function is only valid to invoke with the Client lock or before the Client is set. + */ + ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept; + + /** + * Set if reserved resources are available for the associated Client's service execution. + * + * This function is only valid to invoke with the Client lock or before the Client is set. + */ + ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept; + + /** + * Get the ThreadingModel for the associated Client. + * + * This function is valid to invoke either on the Client thread or with the Client lock. + */ + auto getThreadingModel() const noexcept { + return _threadingModel; + } + + /** + * Get an appropriate ServiceExecutor given the current parameters. + * + * This function is only valid to invoke from the associated Client thread. This function does + * not require the Client lock since all writes must also happen from that thread. + */ + ServiceExecutor* getServiceExecutor() const noexcept; + +private: + friend StringData toString(ThreadingModel threadingModel); + + Client* _client = nullptr; + ServiceEntryPoint* _sep = nullptr; + + ThreadingModel _threadingModel = ThreadingModel::kDedicated; + bool _canUseReserved = false; +}; + + } // namespace transport ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags) diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index b71e6c94abd..5124542291b 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -36,7 +36,6 @@ #include <memory> #include "mongo/config.h" -#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" #include "mongo/db/traffic_recorder.h" @@ -45,6 +44,7 @@ #include "mongo/rpc/op_msg.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" @@ -59,6 +59,7 @@ #include "mongo/util/quick_exit.h" namespace mongo { +namespace transport { namespace { MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome); /** @@ -160,7 +161,6 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { return exhaustMessage; } - } // namespace using transport::ServiceExecutor; @@ -194,9 +194,10 @@ public: // Set up the thread name auto oldThreadName = getThreadName(); - if (oldThreadName != _ssm->_threadName) { - _ssm->_oldThreadName = getThreadName().toString(); - setThreadName(_ssm->_threadName); + const auto& threadName = _ssm->_dbClient->desc(); + if (oldThreadName != threadName) { + _oldThreadName = oldThreadName.toString(); + setThreadName(threadName); } // Swap the current Client so calls to cc() work as expected @@ -258,8 +259,8 @@ public: _ssm->_dbClient = Client::releaseCurrent(); } - if (!_ssm->_oldThreadName.empty()) { - setThreadName(_ssm->_oldThreadName); + if (!_oldThreadName.empty()) { + setThreadName(_oldThreadName); } } @@ -287,29 +288,22 @@ public: private: ServiceStateMachine* _ssm; bool _haveTakenOwnership = false; + std::string _oldThreadName; }; -std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext, - transport::SessionHandle session, - transport::Mode transportMode) { - return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), transportMode); -} - -ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, - transport::SessionHandle session, - transport::Mode transportMode) +ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) : _state{State::Created}, - _sep{svcContext->getServiceEntryPoint()}, - _transportMode(transportMode), - _serviceContext(svcContext), - _sessionHandle(session), - _threadName{str::stream() << "conn" << _session()->id()}, - _dbClient{svcContext->makeClient(_threadName, std::move(session))}, - _dbClientPtr{_dbClient.get()}, - _serviceExecutor(transport::ServiceExecutorSynchronous::get(_serviceContext)) {} + _serviceContext{client->getServiceContext()}, + _sep{_serviceContext->getServiceEntryPoint()}, + _dbClient{std::move(client)}, + _dbClientPtr{_dbClient.get()} {} const transport::SessionHandle& ServiceStateMachine::_session() const { - return _sessionHandle; + return _dbClientPtr->session(); +} + +ServiceExecutor* ServiceStateMachine::_executor() { + return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor(); } Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) { @@ -319,11 +313,12 @@ Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) { guard.release(); auto sourceMsgImpl = [&] { - if (_transportMode == transport::Mode::kSynchronous) { + const auto& transportMode = _executor()->transportMode(); + if (transportMode == transport::Mode::kSynchronous) { MONGO_IDLE_THREAD_BLOCK; return Future<Message>::makeReady(_session()->sourceMessage()); } else { - invariant(_transportMode == transport::Mode::kAsynchronous); + invariant(transportMode == transport::Mode::kAsynchronous); return _session()->asyncSourceMessage(); } }; @@ -346,13 +341,14 @@ Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) { auto toSink = std::exchange(_outMessage, {}); auto sinkMsgImpl = [&] { - if (_transportMode == transport::Mode::kSynchronous) { + const auto& transportMode = _executor()->transportMode(); + if (transportMode == transport::Mode::kSynchronous) { // We don't consider ourselves idle while sending the reply since we are still doing // work on behalf of the client. Contrast that with sourceMessage() where we are waiting // for the client to send us more work to do. return Future<void>::makeReady(_session()->sinkMessage(std::move(toSink))); } else { - invariant(_transportMode == transport::Mode::kAsynchronous); + invariant(transportMode == transport::Mode::kAsynchronous); return _session()->asyncSinkMessage(std::move(toSink)); } }; @@ -439,7 +435,7 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { invariant(!_inMessage.empty()); TrafficRecorder::get(_serviceContext) - .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage); + .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage); auto& compressorMgr = MessageCompressorManager::forSession(_session()); @@ -511,8 +507,7 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { } TrafficRecorder::get(_serviceContext) - .observe( - _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink); _outMessage = std::move(toSink); } else { @@ -524,16 +519,13 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { }); } -void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) { - _serviceExecutor = executor; -} - -void ServiceStateMachine::start(Ownership ownershipModel) { - _serviceExecutor->schedule(GuaranteedExecutor::enforceRunOnce( - [this, anchor = shared_from_this(), ownershipModel](Status status) { +void ServiceStateMachine::start() { + _executor()->schedule( + GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { + // TODO(SERVER-49109) We can't use static ownership in general with + // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter. ThreadGuard guard(shared_from_this().get()); - if (ownershipModel == Ownership::kStatic) - guard.markStaticOwnership(); + guard.markStaticOwnership(); // If this is the first run of the SSM, then update its state to Source if (state() == State::Created) { @@ -583,7 +575,7 @@ void ServiceStateMachine::_runOnce() { return; } - _serviceExecutor->schedule(GuaranteedExecutor::enforceRunOnce( + _executor()->schedule(GuaranteedExecutor::enforceRunOnce( [this, anchor = shared_from_this()](Status status) { _runOnce(); })); }); } @@ -674,4 +666,5 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { Client::releaseCurrent(); } +} // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index b04a419a362..7794cdfbfce 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/config.h" +#include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" @@ -48,6 +49,7 @@ #include "mongo/util/net/ssl_manager.h" namespace mongo { +namespace transport { /* * The ServiceStateMachine holds the state of a single client connection and represents the @@ -64,17 +66,9 @@ public: ServiceStateMachine& operator=(ServiceStateMachine&&) = delete; /* - * Creates a new ServiceStateMachine for a given session/service context. If sync is true, - * then calls into the transport layer will block while they complete, otherwise they will - * be handled asynchronously. + * Construct a ServiceStateMachine for a given Client. */ - static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, - transport::SessionHandle session, - transport::Mode transportMode); - - ServiceStateMachine(ServiceContext* svcContext, - transport::SessionHandle session, - transport::Mode transportMode); + ServiceStateMachine(ServiceContext::UniqueClient client); /* * Any state may transition to EndSession in case of an error, otherwise the valid state @@ -109,13 +103,7 @@ public: /* * start() schedules a call to _runOnce() in the future. */ - void start(Ownership ownershipModel); - - /* - * Set the executor to be used for the next call to runNext(). This allows switching between - * thread models after the SSM has started. - */ - void setServiceExecutor(transport::ServiceExecutor* executor); + void start(); /* * Gets the current state of connection for testing/diagnostic purposes. @@ -174,6 +162,11 @@ private: const transport::SessionHandle& _session() const; /* + * Gets the transport::ServiceExecutor associated with this connection. + */ + ServiceExecutor* _executor(); + + /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ @@ -210,16 +203,12 @@ private: AtomicWord<State> _state{State::Created}; - ServiceEntryPoint* _sep; - transport::Mode _transportMode; - ServiceContext* const _serviceContext; + ServiceEntryPoint* const _sep; transport::SessionHandle _sessionHandle; - const std::string _threadName; ServiceContext::UniqueClient _dbClient; - const Client* _dbClientPtr; - transport::ServiceExecutor* _serviceExecutor; + Client* _dbClientPtr; std::function<void()> _cleanupHook; bool _inExhaust = false; @@ -235,7 +224,6 @@ private: #if MONGO_CONFIG_DEBUG_BUILD AtomicWord<stdx::thread::id> _owningThread; #endif - std::string _oldThreadName; }; template <typename T> @@ -266,4 +254,5 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) { return stream; } +} // namespace transport } // namespace mongo |