diff options
-rw-r--r-- | jstests/noPassthrough/max_conns_override.js | 158 | ||||
-rw-r--r-- | src/mongo/db/commands/server_status_servers.cpp | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 24 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.cpp | 119 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 51 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 203 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 27 |
7 files changed, 394 insertions, 190 deletions
diff --git a/jstests/noPassthrough/max_conns_override.js b/jstests/noPassthrough/max_conns_override.js index a0005f981e5..9da68bdb90b 100644 --- a/jstests/noPassthrough/max_conns_override.js +++ b/jstests/noPassthrough/max_conns_override.js @@ -1,43 +1,137 @@ (function() { 'use strict'; + +load("jstests/libs/host_ipaddr.js"); + const configuredMaxConns = 5; const configuredReadyAdminThreads = 3; let conn = MongoRunner.runMongod({ config: "jstests/noPassthrough/libs/max_conns_override_config.yaml", }); -// Use up all the maxConns with junk connections, all of these should succeed -let maxConns = []; -for (let i = 0; i < 5; i++) { - maxConns.push(new Mongo(`127.0.0.1:${conn.port}`)); - let tmpDb = maxConns[maxConns.length - 1].getDB("admin"); - assert.commandWorked(tmpDb.runCommand({isMaster: 1})); +// Get serverStatus to check that we have the right number of threads in the right places +function getStats() { + return assert.commandWorked(conn.getDB("admin").runCommand({serverStatus: 1})); } -// Get serverStatus to check that we have the right number of threads in the right places -let status = conn.getDB("admin").runCommand({serverStatus: 1}); -const connectionsStatus = status["connections"]; -const reservedExecutorStatus = connectionsStatus["adminConnections"]; -const normalExecutorStatus = status["network"]["serviceExecutorTaskStats"]; - -// Log these serverStatus sections so we can debug this easily -print("connections status section: ", tojson(connectionsStatus)); -print("normal executor status section: ", tojson(normalExecutorStatus)); - -// The number of "available" connections should be less than zero, because we've used -// all of maxConns. We're over the limit! -assert.lt(connectionsStatus["available"], 0); -// The number of "current" connections should be greater than maxConns -assert.gt(connectionsStatus["current"], configuredMaxConns); -// The number of ready threads should be the number of readyThreads we configured, since -// every thread spawns a new thread on startup -assert.eq(reservedExecutorStatus["readyThreads"] + reservedExecutorStatus["startingThreads"], - configuredReadyAdminThreads); -// The number of running admin threads should be greater than the readyThreads, because -// one is being used right now -assert.gt(reservedExecutorStatus["threadsRunning"], reservedExecutorStatus["readyThreads"]); -// The normal serviceExecutor should only be running maxConns number of threads -assert.eq(normalExecutorStatus["threadsRunning"], configuredMaxConns); - -MongoRunner.stopMongod(conn); +function verifyStats({exemptCount, normalCount}) { + const totalCount = exemptCount + normalCount; + + // Verify that we have updated serverStatus. + assert.soon(() => { + const serverStatus = getStats(); + + const readyAdminThreads = serverStatus.connections.adminConnections.readyThreads; + if (readyAdminThreads < configuredReadyAdminThreads) { + print(`Not enough admin threads yet: ${readyAdminThreads} vs ${ + configuredReadyAdminThreads}`); + return false; + } + + const currentCount = serverStatus.connections.current; + if (currentCount != totalCount) { + print(`Not yet at the expected count of connections: ${currentCount} vs ${totalCount}`); + return false; + } + + return true; + }, "Failed to verify initial conditions", 10000); + + const serverStatus = getStats(); + const connectionsStatus = serverStatus.connections; + const reservedExecutorStatus = connectionsStatus.adminConnections; + const executorStatus = serverStatus.network.serviceExecutorTaskStats; + + // Log these serverStatus sections so we can debug this easily. + const filteredSections = { + connections: connectionsStatus, + network: { + serviceExecutorTaskStats: executorStatus, + } + }; + print(`serverStatus: ${tojson(filteredSections)}`); + + if (totalCount > configuredMaxConns) { + // If we're over maxConns, there are no available connections. + assert.lte(connectionsStatus["available"], -1); + } else { + assert.eq(connectionsStatus["available"], configuredMaxConns - totalCount); + } + + // All connections on an exempt CIDR should be marked as limitExempt. + assert.eq(connectionsStatus["limitExempt"], exemptCount); + + // Without a borrowing executor, all connections are threaded. + assert.eq(connectionsStatus["threaded"], totalCount); + + if (totalCount > configuredMaxConns) { + // The normal serviceExecutor should only be running at most maxConns number of threads. + assert.eq(executorStatus["threadsRunning"], configuredMaxConns); + } else { + assert.eq(executorStatus["threadsRunning"], totalCount); + } + + // We should have all excess connections on the reserved executor. + assert.gt(reservedExecutorStatus["threadsRunning"], totalCount - configuredMaxConns); +} + +// Use the external ip to avoid our exempt CIDR. +let ip = get_ipaddr(); + +try { + let adminConns = []; + let normalConns = []; + + // We start with one exempt control socket. + let exemptCount = 1; + let normalCount = 0; + + // Do an initial verification. + verifyStats({exemptCount: exemptCount, normalCount: normalCount}); + + for (let i = 0; i < 2 * configuredMaxConns; i++) { + // Make some connections using the exempt CIDR and some using the normal CIDR. + let isExempt = (i % 2 == 0); + try { + if (isExempt) { + adminConns.push(new Mongo(`127.0.0.1:${conn.port}`)); + ++exemptCount; + } else { + normalConns.push(new Mongo(`${ip}:${conn.port}`)); + ++normalCount; + } + } catch (e) { + print(e); + + // If we couldn't connect, that means we've exceeded maxConns and we're using the normal + // CIDR. + assert(!isExempt); + assert(i >= configuredMaxConns); + } + + verifyStats({exemptCount: exemptCount, normalCount: normalCount}); + } + + // Some common sense assertions around what was admitted. + assert.eq(exemptCount, configuredMaxConns + 1); + assert.lte(normalCount, configuredMaxConns); + + // Destroy all admin connections and verify assumptions. + while (adminConns.length) { + adminConns.pop().close(); + --exemptCount; + + verifyStats({exemptCount: exemptCount, normalCount: normalCount}); + } + + // Destroy all normal connections and verify assumptions. + while (normalConns.length) { + normalConns.pop().close(); + --normalCount; + + verifyStats({exemptCount: exemptCount, normalCount: normalCount}); + } +} finally { + MongoRunner.stopMongod(conn); +} })(); diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp index cc39b6da7d8..7ee1dfcba52 100644 --- a/src/mongo/db/commands/server_status_servers.cpp +++ b/src/mongo/db/commands/server_status_servers.cpp @@ -78,7 +78,7 @@ public: return true; } - // TODO: need to track connections in server stats (see SERVER-49073) + // TODO: need to track connections in server stats (see SERVER-49109) BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const override { BSONObjBuilder b; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 0090f30a686..38093644e2c 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -40,6 +40,7 @@ #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" #include "mongo/transport/ismaster_metrics.h" +#include "mongo/transport/service_executor.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/session.h" #include "mongo/util/processinfo.h" @@ -164,16 +165,6 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto clientName = "conn{}"_format(session->id()); auto client = _svcCtx->makeClient(clientName, session); - { - stdx::lock_guard lk(*client); - auto seCtx = - transport::ServiceExecutorContext{} - .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated) - .setCanUseReserved(canOverrideMaxConns); - - transport::ServiceExecutorContext::set(client.get(), std::move(seCtx)); - } - auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client)); const bool quiet = serverGlobalParams.quiet.load(); @@ -197,6 +188,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", + "remote"_attr = session->remote(), "connectionCount"_attr = connectionCount); } return; @@ -228,7 +220,10 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { } }); - ssm->start(); + auto seCtx = transport::ServiceExecutorContext{}; + seCtx.setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated); + seCtx.setCanUseReserved(canOverrideMaxConns); + ssm->start(std::move(seCtx)); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { @@ -329,6 +324,13 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { invariant(_svcCtx); bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations())); + + const auto seStats = transport::ServiceExecutorStats::get(_svcCtx); + bob->append("threaded", static_cast<int>(seStats.usesDedicated)); + if (serverGlobalParams.maxConnsOverride.size()) { + bob->append("limitExempt", static_cast<int>(seStats.limitExempt)); + } + bob->append("exhaustIsMaster", static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster())); bob->append("awaitingTopologyChanges", diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp index 37c3d03e1b1..c5fd302d64c 100644 --- a/src/mongo/transport/service_executor.cpp +++ b/src/mongo/transport/service_executor.cpp @@ -40,12 +40,15 @@ #include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_executor_synchronous.h" +#include "mongo/util/synchronized_value.h" namespace mongo { namespace transport { namespace { static constexpr auto kDiagnosticLogLevel = 4; +auto getServiceExecutorStats = + ServiceContext::declareDecoration<synchronized_value<ServiceExecutorStats>>(); auto getServiceExecutorContext = Client::declareDecoration<boost::optional<ServiceExecutorContext>>(); } // namespace @@ -61,6 +64,10 @@ StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) { } } +ServiceExecutorStats ServiceExecutorStats::get(ServiceContext* ctx) noexcept { + return getServiceExecutorStats(ctx).get(); +} + ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept { auto& serviceExecutorContext = getServiceExecutorContext(client); @@ -79,6 +86,24 @@ void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) n seCtx._client = client; seCtx._sep = client->getServiceContext()->getServiceEntryPoint(); + { + auto stats = getServiceExecutorStats(client->getServiceContext()).synchronize(); + if (seCtx._canUseReserved) { + ++stats->limitExempt; + } + + switch (seCtx._threadingModel) { + case ThreadingModel::kBorrowed: { + ++stats->usesBorrowed; + } break; + case ThreadingModel::kDedicated: { + ++stats->usesDedicated; + } break; + default: + MONGO_UNREACHABLE; + } + } + LOGV2_DEBUG(4898000, kDiagnosticLogLevel, "Setting initial ServiceExecutor context for client", @@ -88,18 +113,91 @@ void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) n serviceExecutorContext = std::move(seCtx); } -ServiceExecutorContext& ServiceExecutorContext::setThreadingModel( - ThreadingModel threadingModel) noexcept { - _threadingModel = threadingModel; - return *this; +void ServiceExecutorContext::reset(Client* client) noexcept { + if (client) { + auto& serviceExecutorContext = getServiceExecutorContext(client); + + auto stats = getServiceExecutorStats(client->getServiceContext()).synchronize(); + + LOGV2_DEBUG(4898001, + kDiagnosticLogLevel, + "Resetting ServiceExecutor context for client", + "client"_attr = client->desc(), + "threadingModel"_attr = serviceExecutorContext->_threadingModel, + "canUseReserved"_attr = serviceExecutorContext->_canUseReserved); + + if (serviceExecutorContext->_canUseReserved) { + --stats->limitExempt; + } + + switch (serviceExecutorContext->_threadingModel) { + case ThreadingModel::kBorrowed: { + --stats->usesBorrowed; + } break; + case ThreadingModel::kDedicated: { + --stats->usesDedicated; + } break; + default: + MONGO_UNREACHABLE; + } + } +} + +void ServiceExecutorContext::setThreadingModel(ThreadingModel threadingModel) noexcept { + + if (_threadingModel == threadingModel) { + // Nothing to do. + return; + } + + auto lastThreadingModel = std::exchange(_threadingModel, threadingModel); + + if (_client) { + auto stats = getServiceExecutorStats(_client->getServiceContext()).synchronize(); + + // Decrement the stats for the previous ThreadingModel. + switch (lastThreadingModel) { + case ThreadingModel::kBorrowed: { + --stats->usesBorrowed; + } break; + case ThreadingModel::kDedicated: { + --stats->usesDedicated; + } break; + default: + MONGO_UNREACHABLE; + } + // Increment the stats for the next ThreadingModel. + switch (_threadingModel) { + case ThreadingModel::kBorrowed: { + ++stats->usesBorrowed; + } break; + case ThreadingModel::kDedicated: { + ++stats->usesDedicated; + } break; + default: + MONGO_UNREACHABLE; + } + } } -ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept { +void ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept { + if (_canUseReserved == canUseReserved) { + // Nothing to do. + return; + } + _canUseReserved = canUseReserved; - return *this; + if (_client) { + auto stats = getServiceExecutorStats(_client->getServiceContext()).synchronize(); + if (canUseReserved) { + ++stats->limitExempt; + } else { + --stats->limitExempt; + } + } } -ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept { +ServiceExecutor* ServiceExecutorContext::getServiceExecutor() noexcept { invariant(_client); switch (_threadingModel) { @@ -121,13 +219,16 @@ ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept { return _sep->numOpenSessions() > _sep->maxOpenSessions(); }; - if (_canUseReserved && shouldUseReserved()) { + if (_canUseReserved && !_hasUsedSynchronous && shouldUseReserved()) { if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) { - // We are allowed to use the reserved executor, we should use it, and it exists. + // We are allowed to use the reserved, we have not used the synchronous, we should use + // the reserved, and the reserved exists. return exec; } } + // Once we use the ServiceExecutorSynchronous, we shouldn't use the ServiceExecutorReserved. + _hasUsedSynchronous = true; return transport::ServiceExecutorSynchronous::get(_client->getServiceContext()); } diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 38116908272..ea27129ab94 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -146,21 +146,43 @@ public: */ static void set(Client* client, ServiceExecutorContext seCtx) noexcept; + + /** + * Reset the ServiceExecutorContext for a given client. + * + * This function may only be invoked once and only while under the Client lock. + */ + static void reset(Client* client) noexcept; + ServiceExecutorContext() = default; + ServiceExecutorContext(const ServiceExecutorContext&) = delete; + ServiceExecutorContext& operator=(const ServiceExecutorContext&) = delete; + ServiceExecutorContext(ServiceExecutorContext&& seCtx) + : _client{std::exchange(seCtx._client, nullptr)}, + _sep{std::exchange(seCtx._sep, nullptr)}, + _threadingModel{seCtx._threadingModel}, + _canUseReserved{seCtx._canUseReserved} {} + ServiceExecutorContext& operator=(ServiceExecutorContext&& seCtx) { + _client = std::exchange(seCtx._client, nullptr); + _sep = std::exchange(seCtx._sep, nullptr); + _threadingModel = seCtx._threadingModel; + _canUseReserved = seCtx._canUseReserved; + return *this; + } /** * Set the ThreadingModel for the associated Client's service execution. * * This function is only valid to invoke with the Client lock or before the Client is set. */ - ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept; + void setThreadingModel(ThreadingModel threadingModel) noexcept; /** * Set if reserved resources are available for the associated Client's service execution. * * This function is only valid to invoke with the Client lock or before the Client is set. */ - ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept; + void setCanUseReserved(bool canUseReserved) noexcept; /** * Get the ThreadingModel for the associated Client. @@ -177,7 +199,7 @@ public: * This function is only valid to invoke from the associated Client thread. This function does * not require the Client lock since all writes must also happen from that thread. */ - ServiceExecutor* getServiceExecutor() const noexcept; + ServiceExecutor* getServiceExecutor() noexcept; private: friend StringData toString(ThreadingModel threadingModel); @@ -187,8 +209,31 @@ private: ThreadingModel _threadingModel = ThreadingModel::kDedicated; bool _canUseReserved = false; + bool _hasUsedSynchronous = false; }; +/** + * A small statlet for tracking which executors may be in use. + */ +class ServiceExecutorStats { +public: + /** + * Get the current value of ServiceExecutorStats for the given ServiceContext. + * + * Note that this value is intended for statistics and logging. It is unsynchronized and + * unsuitable for informing decisions in runtime. + */ + static ServiceExecutorStats get(ServiceContext* ctx) noexcept; + + // The number of Clients who use the dedicated executors. + size_t usesDedicated = 0; + + // The number of Clients who use the borrowed executors. + size_t usesBorrowed = 0; + + // The number of Clients that are allowed to ignore maxConns and use reserved resources. + size_t limitExempt = 0; +}; } // namespace transport diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 5124542291b..e9fa1ee95a6 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -163,9 +163,6 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) { } } // namespace -using transport::ServiceExecutor; -using transport::TransportLayer; - /* * This class wraps up the logic for swapping/unswapping the Client when transitioning * between states. @@ -178,115 +175,83 @@ class ServiceStateMachine::ThreadGuard { public: explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} { - auto owned = Ownership::kUnowned; - _ssm->_owned.compareAndSwap(&owned, Ownership::kOwned); - if (owned == Ownership::kStatic) { - dassert(haveClient()); - dassert(Client::getCurrent() == _ssm->_dbClientPtr); - _haveTakenOwnership = true; + invariant(_ssm); + + if (_ssm->_clientPtr == Client::getCurrent()) { + // We're not the first on this thread, nothing more to do. return; } -#ifdef MONGO_CONFIG_DEBUG_BUILD - invariant(owned == Ownership::kUnowned); - _ssm->_owningThread.store(stdx::this_thread::get_id()); -#endif + auto& client = _ssm->_client; + invariant(client); // Set up the thread name auto oldThreadName = getThreadName(); - const auto& threadName = _ssm->_dbClient->desc(); + const auto& threadName = client->desc(); if (oldThreadName != threadName) { _oldThreadName = oldThreadName.toString(); setThreadName(threadName); } // Swap the current Client so calls to cc() work as expected - Client::setCurrent(std::move(_ssm->_dbClient)); + Client::setCurrent(std::move(client)); _haveTakenOwnership = true; } // Constructing from a moved ThreadGuard invalidates the other thread guard. ThreadGuard(ThreadGuard&& other) - : _ssm(other._ssm), _haveTakenOwnership(other._haveTakenOwnership) { - other._haveTakenOwnership = false; - } + : _ssm{std::exchange(other._ssm, nullptr)}, + _haveTakenOwnership{std::exchange(_haveTakenOwnership, false)} {} ThreadGuard& operator=(ThreadGuard&& other) { - if (this != &other) { - _ssm = other._ssm; - _haveTakenOwnership = other._haveTakenOwnership; - other._haveTakenOwnership = false; - } + _ssm = std::exchange(other._ssm, nullptr); + _haveTakenOwnership = std::exchange(other._haveTakenOwnership, false); return *this; }; ThreadGuard() = delete; ~ThreadGuard() { - if (_haveTakenOwnership) - release(); + release(); } explicit operator bool() const { -#ifdef MONGO_CONFIG_DEBUG_BUILD - if (_haveTakenOwnership) { - invariant(_ssm->_owned.load() != Ownership::kUnowned); - invariant(_ssm->_owningThread.load() == stdx::this_thread::get_id()); - return true; - } else { - return false; - } -#else - return _haveTakenOwnership; -#endif - } - - void markStaticOwnership() { - dassert(static_cast<bool>(*this)); - _ssm->_owned.store(Ownership::kStatic); + return _ssm; } void release() { - auto owned = _ssm->_owned.load(); - -#ifdef MONGO_CONFIG_DEBUG_BUILD - dassert(_haveTakenOwnership); - dassert(owned != Ownership::kUnowned); - dassert(_ssm->_owningThread.load() == stdx::this_thread::get_id()); -#endif - if (owned != Ownership::kStatic) { - if (haveClient()) { - _ssm->_dbClient = Client::releaseCurrent(); - } - - if (!_oldThreadName.empty()) { - setThreadName(_oldThreadName); - } + if (!_ssm) { + // We've been released or moved from. + return; } - // If the session has ended, then it's unsafe to do anything but call the cleanup hook. - if (_ssm->state() == State::Ended) { - // The cleanup hook gets moved out of _ssm->_cleanupHook so that it can only be called - // once. - auto cleanupHook = std::move(_ssm->_cleanupHook); - if (cleanupHook) - cleanupHook(); + // If we have a ServiceStateMachine pointer, then it should control the current Client. + invariant(_ssm->_clientPtr == Client::getCurrent()); + + if (auto haveTakenOwnership = std::exchange(_haveTakenOwnership, false); + !haveTakenOwnership) { + // Reset our pointer so that we cannot release again. + _ssm = nullptr; - // It's very important that the Guard returns here and that the SSM's state does not - // get modified in any way after the cleanup hook is called. + // We are not the original owner, nothing more to do. return; } - _haveTakenOwnership = false; - // If owned != Ownership::kOwned here then it can only equal Ownership::kStatic and we - // should just return - if (owned == Ownership::kOwned) { - _ssm->_owned.store(Ownership::kUnowned); + // Reclaim the client. + _ssm->_client = Client::releaseCurrent(); + + // Reset our pointer so that we cannot release again. + _ssm = nullptr; + + if (!_oldThreadName.empty()) { + // Reset the old thread name. + setThreadName(_oldThreadName); } } private: - ServiceStateMachine* _ssm; + ServiceStateMachine* _ssm = nullptr; + bool _haveTakenOwnership = false; std::string _oldThreadName; }; @@ -295,22 +260,23 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) : _state{State::Created}, _serviceContext{client->getServiceContext()}, _sep{_serviceContext->getServiceEntryPoint()}, - _dbClient{std::move(client)}, - _dbClientPtr{_dbClient.get()} {} + _client{std::move(client)}, + _clientPtr{_client.get()} {} -const transport::SessionHandle& ServiceStateMachine::_session() const { - return _dbClientPtr->session(); +const transport::SessionHandle& ServiceStateMachine::_session() { + return _clientPtr->session(); } ServiceExecutor* ServiceStateMachine::_executor() { - return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor(); + return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor(); } -Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) { +Future<void> ServiceStateMachine::_sourceMessage() { + auto guard = ThreadGuard(this); + invariant(_inMessage.empty()); invariant(_state.load() == State::Source); _state.store(State::SourceWait); - guard.release(); auto sourceMsgImpl = [&] { const auto& transportMode = _executor()->transportMode(); @@ -333,11 +299,12 @@ Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) { }); } -Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) { +Future<void> ServiceStateMachine::_sinkMessage() { + auto guard = ThreadGuard(this); + // Sink our response to the client invariant(_state.load() == State::Process); _state.store(State::SinkWait); - guard.release(); auto toSink = std::exchange(_outMessage, {}); auto sinkMsgImpl = [&] { @@ -360,12 +327,10 @@ Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) { } void ServiceStateMachine::_sourceCallback(Status status) { - // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this - // thread. - ThreadGuard guard(this); + auto guard = ThreadGuard(this); + + invariant(state() == State::SourceWait); - // Make sure we just called sourceMessage(); - dassert(state() == State::SourceWait); auto remote = _session()->remote(); if (status.isOK()) { @@ -405,11 +370,9 @@ void ServiceStateMachine::_sourceCallback(Status status) { } void ServiceStateMachine::_sinkCallback(Status status) { - // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this - // thread. - ThreadGuard guard(this); + auto guard = ThreadGuard(this); - dassert(state() == State::SinkWait); + invariant(state() == State::SinkWait); // If there was an error sinking the message to the client, then we should print an error and // end the session. @@ -431,7 +394,9 @@ void ServiceStateMachine::_sinkCallback(Status status) { } } -Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { +Future<void> ServiceStateMachine::_processMessage() { + auto guard = ThreadGuard(this); + invariant(!_inMessage.empty()); TrafficRecorder::get(_serviceContext) @@ -459,10 +424,10 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. return _sep->handleRequest(opCtx.get(), _inMessage) - .then([this, - &compressorMgr = compressorMgr, - opCtx = std::move(opCtx), - guard = std::move(guard)](DbResponse dbresponse) mutable -> void { + .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)]( + DbResponse dbresponse) mutable -> void { + auto guard = ThreadGuard(this); + // opCtx must be killed and delisted here so that the operation cannot show up in // currentOp results after the response reaches the client. The destruction is postponed // for later to mitigate its performance impact on the critical path of execution. @@ -519,14 +484,15 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) { }); } -void ServiceStateMachine::start() { +void ServiceStateMachine::start(ServiceExecutorContext seCtx) { + { + stdx::lock_guard lk(*_clientPtr); + + ServiceExecutorContext::set(_clientPtr, std::move(seCtx)); + } + _executor()->schedule( GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) { - // TODO(SERVER-49109) We can't use static ownership in general with - // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter. - ThreadGuard guard(shared_from_this().get()); - guard.markStaticOwnership(); - // If this is the first run of the SSM, then update its state to Source if (state() == State::Created) { _state.store(State::Source); @@ -541,16 +507,16 @@ void ServiceStateMachine::_runOnce() { if (_inExhaust) { return Status::OK(); } else { - return _sourceMessage(ThreadGuard(this)); + return _sourceMessage(); } }) - .then([this]() { return _processMessage(ThreadGuard(this)); }) + .then([this]() { return _processMessage(); }) .then([this]() -> Future<void> { if (_outMessage.empty()) { return Status::OK(); } - return _sinkMessage(ThreadGuard(this)); + return _sinkMessage(); }) .getAsync([this, anchor = shared_from_this()](Status status) { // Destroy the opCtx (already killed) here, to potentially use the delay between @@ -560,9 +526,9 @@ void ServiceStateMachine::_runOnce() { } if (!status.isOK()) { _state.store(State::EndSession); - // The service executor failed to schedule the task. This could for example be that - // we failed to start a worker thread. Terminate this connection to leave the system - // in a valid state. + // The service executor failed to schedule the task. This could for example be + // that we failed to start a worker thread. Terminate this connection to leave + // the system in a valid state. LOGV2_WARNING_OPTIONS(4910400, {logv2::LogComponent::kExecutor}, "Terminating session due to error: {error}", @@ -570,8 +536,8 @@ void ServiceStateMachine::_runOnce() { "error"_attr = status); terminate(); - ThreadGuard terminateGuard(this); - _cleanupSession(std::move(terminateGuard)); + _executor()->schedule(GuaranteedExecutor::enforceRunOnce( + [this, anchor = shared_from_this()](Status status) { _cleanupSession(); })); return; } @@ -593,8 +559,8 @@ void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask t auto sessionTags = _session()->getTags(); - // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been - // set, then skip the termination check. + // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have + // been set, then skip the termination check. if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) { LOGV2(22991, "Skip closing connection for connection", @@ -646,7 +612,9 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try { "error"_attr = e.toStatus()); } -void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { +void ServiceStateMachine::_cleanupSession() { + auto guard = ThreadGuard(this); + // Ensure the delayed destruction of opCtx always happens before doing the cleanup. if (MONGO_likely(_killedOpCtx)) { _killedOpCtx.reset(); @@ -655,15 +623,20 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) { _cleanupExhaustResources(); + { + stdx::lock_guard lk(*_clientPtr); + transport::ServiceExecutorContext::reset(_clientPtr); + } + + if (auto cleanupHook = std::exchange(_cleanupHook, {})) { + cleanupHook(); + } + _state.store(State::Ended); _inMessage.reset(); _outMessage.reset(); - - // By ignoring the return value of Client::releaseCurrent() we destroy the session. - // _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed. - Client::releaseCurrent(); } } // namespace transport diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index 7794cdfbfce..2c649442fee 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -103,7 +103,7 @@ public: /* * start() schedules a call to _runOnce() in the future. */ - void start(); + void start(ServiceExecutorContext seCtx); /* * Gets the current state of connection for testing/diagnostic purposes. @@ -137,7 +137,6 @@ private: * each step in _runOnce(); */ class ThreadGuard; - friend class ThreadGuard; /* * Terminates the associated transport Session if status indicate error. @@ -147,19 +146,9 @@ private: void _terminateAndLogIfError(Status status); /* - * This is a helper function to schedule tasks on the serviceExecutor maintaining a shared_ptr - * copy to anchor the lifetime of the SSM while waiting for callbacks to run. - * - * If scheduling the function fails, the SSM will be terminated and cleaned up immediately - */ - void _scheduleNextWithGuard(ThreadGuard guard, - transport::ServiceExecutor::ScheduleFlags flags, - Ownership ownershipModel = Ownership::kOwned); - - /* * Gets the transport::Session associated with this connection */ - const transport::SessionHandle& _session() const; + const transport::SessionHandle& _session(); /* * Gets the transport::ServiceExecutor associated with this connection. @@ -170,7 +159,7 @@ private: * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ - Future<void> _processMessage(ThreadGuard guard); + Future<void> _processMessage(); /* * These get called by the TransportLayer when requested network I/O has completed. @@ -182,13 +171,13 @@ private: * Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just * before waiting on the TL. */ - Future<void> _sourceMessage(ThreadGuard guard); - Future<void> _sinkMessage(ThreadGuard guard); + Future<void> _sourceMessage(); + Future<void> _sinkMessage(); /* * Releases all the resources associated with the session and call the cleanupHook. */ - void _cleanupSession(ThreadGuard guard); + void _cleanupSession(); /* * This is the initial function called at the beginning of a thread's lifecycle in the @@ -207,8 +196,8 @@ private: ServiceEntryPoint* const _sep; transport::SessionHandle _sessionHandle; - ServiceContext::UniqueClient _dbClient; - Client* _dbClientPtr; + ServiceContext::UniqueClient _client; + Client* _clientPtr; std::function<void()> _cleanupHook; bool _inExhaust = false; |