summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-08-31 23:54:07 +0000
committerBen Caimano <ben.caimano@10gen.com>2020-09-14 04:04:35 +0000
commit92e7ecf4ba43a483c91b17e5adfeedc2dc0c6fcf (patch)
treeb638b1c5e024095e239118c49916b39ece224479
parent46b220b83efc24d85ff90fc1727d5a22ffcb2960 (diff)
downloadmongo-92e7ecf4ba43a483c91b17e5adfeedc2dc0c6fcf.tar.gz
SERVER-49073 Track connections that are maxConn exempt or threaded
-rw-r--r--jstests/noPassthrough/max_conns_override.js158
-rw-r--r--src/mongo/db/commands/server_status_servers.cpp2
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp24
-rw-r--r--src/mongo/transport/service_executor.cpp119
-rw-r--r--src/mongo/transport/service_executor.h51
-rw-r--r--src/mongo/transport/service_state_machine.cpp203
-rw-r--r--src/mongo/transport/service_state_machine.h27
7 files changed, 394 insertions, 190 deletions
diff --git a/jstests/noPassthrough/max_conns_override.js b/jstests/noPassthrough/max_conns_override.js
index a0005f981e5..9da68bdb90b 100644
--- a/jstests/noPassthrough/max_conns_override.js
+++ b/jstests/noPassthrough/max_conns_override.js
@@ -1,43 +1,137 @@
(function() {
'use strict';
+
+load("jstests/libs/host_ipaddr.js");
+
const configuredMaxConns = 5;
const configuredReadyAdminThreads = 3;
let conn = MongoRunner.runMongod({
config: "jstests/noPassthrough/libs/max_conns_override_config.yaml",
});
-// Use up all the maxConns with junk connections, all of these should succeed
-let maxConns = [];
-for (let i = 0; i < 5; i++) {
- maxConns.push(new Mongo(`127.0.0.1:${conn.port}`));
- let tmpDb = maxConns[maxConns.length - 1].getDB("admin");
- assert.commandWorked(tmpDb.runCommand({isMaster: 1}));
+// Get serverStatus to check that we have the right number of threads in the right places
+function getStats() {
+ return assert.commandWorked(conn.getDB("admin").runCommand({serverStatus: 1}));
}
-// Get serverStatus to check that we have the right number of threads in the right places
-let status = conn.getDB("admin").runCommand({serverStatus: 1});
-const connectionsStatus = status["connections"];
-const reservedExecutorStatus = connectionsStatus["adminConnections"];
-const normalExecutorStatus = status["network"]["serviceExecutorTaskStats"];
-
-// Log these serverStatus sections so we can debug this easily
-print("connections status section: ", tojson(connectionsStatus));
-print("normal executor status section: ", tojson(normalExecutorStatus));
-
-// The number of "available" connections should be less than zero, because we've used
-// all of maxConns. We're over the limit!
-assert.lt(connectionsStatus["available"], 0);
-// The number of "current" connections should be greater than maxConns
-assert.gt(connectionsStatus["current"], configuredMaxConns);
-// The number of ready threads should be the number of readyThreads we configured, since
-// every thread spawns a new thread on startup
-assert.eq(reservedExecutorStatus["readyThreads"] + reservedExecutorStatus["startingThreads"],
- configuredReadyAdminThreads);
-// The number of running admin threads should be greater than the readyThreads, because
-// one is being used right now
-assert.gt(reservedExecutorStatus["threadsRunning"], reservedExecutorStatus["readyThreads"]);
-// The normal serviceExecutor should only be running maxConns number of threads
-assert.eq(normalExecutorStatus["threadsRunning"], configuredMaxConns);
-
-MongoRunner.stopMongod(conn);
+function verifyStats({exemptCount, normalCount}) {
+ const totalCount = exemptCount + normalCount;
+
+ // Verify that we have updated serverStatus.
+ assert.soon(() => {
+ const serverStatus = getStats();
+
+ const readyAdminThreads = serverStatus.connections.adminConnections.readyThreads;
+ if (readyAdminThreads < configuredReadyAdminThreads) {
+ print(`Not enough admin threads yet: ${readyAdminThreads} vs ${
+ configuredReadyAdminThreads}`);
+ return false;
+ }
+
+ const currentCount = serverStatus.connections.current;
+ if (currentCount != totalCount) {
+ print(`Not yet at the expected count of connections: ${currentCount} vs ${totalCount}`);
+ return false;
+ }
+
+ return true;
+ }, "Failed to verify initial conditions", 10000);
+
+ const serverStatus = getStats();
+ const connectionsStatus = serverStatus.connections;
+ const reservedExecutorStatus = connectionsStatus.adminConnections;
+ const executorStatus = serverStatus.network.serviceExecutorTaskStats;
+
+ // Log these serverStatus sections so we can debug this easily.
+ const filteredSections = {
+ connections: connectionsStatus,
+ network: {
+ serviceExecutorTaskStats: executorStatus,
+ }
+ };
+ print(`serverStatus: ${tojson(filteredSections)}`);
+
+ if (totalCount > configuredMaxConns) {
+ // If we're over maxConns, there are no available connections.
+ assert.lte(connectionsStatus["available"], -1);
+ } else {
+ assert.eq(connectionsStatus["available"], configuredMaxConns - totalCount);
+ }
+
+ // All connections on an exempt CIDR should be marked as limitExempt.
+ assert.eq(connectionsStatus["limitExempt"], exemptCount);
+
+ // Without a borrowing executor, all connections are threaded.
+ assert.eq(connectionsStatus["threaded"], totalCount);
+
+ if (totalCount > configuredMaxConns) {
+ // The normal serviceExecutor should only be running at most maxConns number of threads.
+ assert.eq(executorStatus["threadsRunning"], configuredMaxConns);
+ } else {
+ assert.eq(executorStatus["threadsRunning"], totalCount);
+ }
+
+ // We should have all excess connections on the reserved executor.
+ assert.gt(reservedExecutorStatus["threadsRunning"], totalCount - configuredMaxConns);
+}
+
+// Use the external ip to avoid our exempt CIDR.
+let ip = get_ipaddr();
+
+try {
+ let adminConns = [];
+ let normalConns = [];
+
+ // We start with one exempt control socket.
+ let exemptCount = 1;
+ let normalCount = 0;
+
+ // Do an initial verification.
+ verifyStats({exemptCount: exemptCount, normalCount: normalCount});
+
+ for (let i = 0; i < 2 * configuredMaxConns; i++) {
+ // Make some connections using the exempt CIDR and some using the normal CIDR.
+ let isExempt = (i % 2 == 0);
+ try {
+ if (isExempt) {
+ adminConns.push(new Mongo(`127.0.0.1:${conn.port}`));
+ ++exemptCount;
+ } else {
+ normalConns.push(new Mongo(`${ip}:${conn.port}`));
+ ++normalCount;
+ }
+ } catch (e) {
+ print(e);
+
+ // If we couldn't connect, that means we've exceeded maxConns and we're using the normal
+ // CIDR.
+ assert(!isExempt);
+ assert(i >= configuredMaxConns);
+ }
+
+ verifyStats({exemptCount: exemptCount, normalCount: normalCount});
+ }
+
+ // Some common sense assertions around what was admitted.
+ assert.eq(exemptCount, configuredMaxConns + 1);
+ assert.lte(normalCount, configuredMaxConns);
+
+ // Destroy all admin connections and verify assumptions.
+ while (adminConns.length) {
+ adminConns.pop().close();
+ --exemptCount;
+
+ verifyStats({exemptCount: exemptCount, normalCount: normalCount});
+ }
+
+ // Destroy all normal connections and verify assumptions.
+ while (normalConns.length) {
+ normalConns.pop().close();
+ --normalCount;
+
+ verifyStats({exemptCount: exemptCount, normalCount: normalCount});
+ }
+} finally {
+ MongoRunner.stopMongod(conn);
+}
})();
diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp
index cc39b6da7d8..7ee1dfcba52 100644
--- a/src/mongo/db/commands/server_status_servers.cpp
+++ b/src/mongo/db/commands/server_status_servers.cpp
@@ -78,7 +78,7 @@ public:
return true;
}
- // TODO: need to track connections in server stats (see SERVER-49073)
+ // TODO: need to track connections in server stats (see SERVER-49109)
BSONObj generateSection(OperationContext* opCtx,
const BSONElement& configElement) const override {
BSONObjBuilder b;
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 0090f30a686..38093644e2c 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
#include "mongo/transport/ismaster_metrics.h"
+#include "mongo/transport/service_executor.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/session.h"
#include "mongo/util/processinfo.h"
@@ -164,16 +165,6 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto clientName = "conn{}"_format(session->id());
auto client = _svcCtx->makeClient(clientName, session);
- {
- stdx::lock_guard lk(*client);
- auto seCtx =
- transport::ServiceExecutorContext{}
- .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated)
- .setCanUseReserved(canOverrideMaxConns);
-
- transport::ServiceExecutorContext::set(client.get(), std::move(seCtx));
- }
-
auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client));
const bool quiet = serverGlobalParams.quiet.load();
@@ -197,6 +188,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
if (!quiet) {
LOGV2(22942,
"Connection refused because there are too many open connections",
+ "remote"_attr = session->remote(),
"connectionCount"_attr = connectionCount);
}
return;
@@ -228,7 +220,10 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
}
});
- ssm->start();
+ auto seCtx = transport::ServiceExecutorContext{};
+ seCtx.setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated);
+ seCtx.setCanUseReserved(canOverrideMaxConns);
+ ssm->start(std::move(seCtx));
}
void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
@@ -329,6 +324,13 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
invariant(_svcCtx);
bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations()));
+
+ const auto seStats = transport::ServiceExecutorStats::get(_svcCtx);
+ bob->append("threaded", static_cast<int>(seStats.usesDedicated));
+ if (serverGlobalParams.maxConnsOverride.size()) {
+ bob->append("limitExempt", static_cast<int>(seStats.limitExempt));
+ }
+
bob->append("exhaustIsMaster",
static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster()));
bob->append("awaitingTopologyChanges",
diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp
index 37c3d03e1b1..c5fd302d64c 100644
--- a/src/mongo/transport/service_executor.cpp
+++ b/src/mongo/transport/service_executor.cpp
@@ -40,12 +40,15 @@
#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_executor_synchronous.h"
+#include "mongo/util/synchronized_value.h"
namespace mongo {
namespace transport {
namespace {
static constexpr auto kDiagnosticLogLevel = 4;
+auto getServiceExecutorStats =
+ ServiceContext::declareDecoration<synchronized_value<ServiceExecutorStats>>();
auto getServiceExecutorContext =
Client::declareDecoration<boost::optional<ServiceExecutorContext>>();
} // namespace
@@ -61,6 +64,10 @@ StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) {
}
}
+ServiceExecutorStats ServiceExecutorStats::get(ServiceContext* ctx) noexcept {
+ return getServiceExecutorStats(ctx).get();
+}
+
ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept {
auto& serviceExecutorContext = getServiceExecutorContext(client);
@@ -79,6 +86,24 @@ void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) n
seCtx._client = client;
seCtx._sep = client->getServiceContext()->getServiceEntryPoint();
+ {
+ auto stats = getServiceExecutorStats(client->getServiceContext()).synchronize();
+ if (seCtx._canUseReserved) {
+ ++stats->limitExempt;
+ }
+
+ switch (seCtx._threadingModel) {
+ case ThreadingModel::kBorrowed: {
+ ++stats->usesBorrowed;
+ } break;
+ case ThreadingModel::kDedicated: {
+ ++stats->usesDedicated;
+ } break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+
LOGV2_DEBUG(4898000,
kDiagnosticLogLevel,
"Setting initial ServiceExecutor context for client",
@@ -88,18 +113,91 @@ void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) n
serviceExecutorContext = std::move(seCtx);
}
-ServiceExecutorContext& ServiceExecutorContext::setThreadingModel(
- ThreadingModel threadingModel) noexcept {
- _threadingModel = threadingModel;
- return *this;
+void ServiceExecutorContext::reset(Client* client) noexcept {
+ if (client) {
+ auto& serviceExecutorContext = getServiceExecutorContext(client);
+
+ auto stats = getServiceExecutorStats(client->getServiceContext()).synchronize();
+
+ LOGV2_DEBUG(4898001,
+ kDiagnosticLogLevel,
+ "Resetting ServiceExecutor context for client",
+ "client"_attr = client->desc(),
+ "threadingModel"_attr = serviceExecutorContext->_threadingModel,
+ "canUseReserved"_attr = serviceExecutorContext->_canUseReserved);
+
+ if (serviceExecutorContext->_canUseReserved) {
+ --stats->limitExempt;
+ }
+
+ switch (serviceExecutorContext->_threadingModel) {
+ case ThreadingModel::kBorrowed: {
+ --stats->usesBorrowed;
+ } break;
+ case ThreadingModel::kDedicated: {
+ --stats->usesDedicated;
+ } break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
+}
+
+void ServiceExecutorContext::setThreadingModel(ThreadingModel threadingModel) noexcept {
+
+ if (_threadingModel == threadingModel) {
+ // Nothing to do.
+ return;
+ }
+
+ auto lastThreadingModel = std::exchange(_threadingModel, threadingModel);
+
+ if (_client) {
+ auto stats = getServiceExecutorStats(_client->getServiceContext()).synchronize();
+
+ // Decrement the stats for the previous ThreadingModel.
+ switch (lastThreadingModel) {
+ case ThreadingModel::kBorrowed: {
+ --stats->usesBorrowed;
+ } break;
+ case ThreadingModel::kDedicated: {
+ --stats->usesDedicated;
+ } break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ // Increment the stats for the next ThreadingModel.
+ switch (_threadingModel) {
+ case ThreadingModel::kBorrowed: {
+ ++stats->usesBorrowed;
+ } break;
+ case ThreadingModel::kDedicated: {
+ ++stats->usesDedicated;
+ } break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
}
-ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept {
+void ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept {
+ if (_canUseReserved == canUseReserved) {
+ // Nothing to do.
+ return;
+ }
+
_canUseReserved = canUseReserved;
- return *this;
+ if (_client) {
+ auto stats = getServiceExecutorStats(_client->getServiceContext()).synchronize();
+ if (canUseReserved) {
+ ++stats->limitExempt;
+ } else {
+ --stats->limitExempt;
+ }
+ }
}
-ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept {
+ServiceExecutor* ServiceExecutorContext::getServiceExecutor() noexcept {
invariant(_client);
switch (_threadingModel) {
@@ -121,13 +219,16 @@ ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept {
return _sep->numOpenSessions() > _sep->maxOpenSessions();
};
- if (_canUseReserved && shouldUseReserved()) {
+ if (_canUseReserved && !_hasUsedSynchronous && shouldUseReserved()) {
if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) {
- // We are allowed to use the reserved executor, we should use it, and it exists.
+ // We are allowed to use the reserved, we have not used the synchronous, we should use
+ // the reserved, and the reserved exists.
return exec;
}
}
+ // Once we use the ServiceExecutorSynchronous, we shouldn't use the ServiceExecutorReserved.
+ _hasUsedSynchronous = true;
return transport::ServiceExecutorSynchronous::get(_client->getServiceContext());
}
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index 38116908272..ea27129ab94 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -146,21 +146,43 @@ public:
*/
static void set(Client* client, ServiceExecutorContext seCtx) noexcept;
+
+ /**
+ * Reset the ServiceExecutorContext for a given client.
+ *
+ * This function may only be invoked once and only while under the Client lock.
+ */
+ static void reset(Client* client) noexcept;
+
ServiceExecutorContext() = default;
+ ServiceExecutorContext(const ServiceExecutorContext&) = delete;
+ ServiceExecutorContext& operator=(const ServiceExecutorContext&) = delete;
+ ServiceExecutorContext(ServiceExecutorContext&& seCtx)
+ : _client{std::exchange(seCtx._client, nullptr)},
+ _sep{std::exchange(seCtx._sep, nullptr)},
+ _threadingModel{seCtx._threadingModel},
+ _canUseReserved{seCtx._canUseReserved} {}
+ ServiceExecutorContext& operator=(ServiceExecutorContext&& seCtx) {
+ _client = std::exchange(seCtx._client, nullptr);
+ _sep = std::exchange(seCtx._sep, nullptr);
+ _threadingModel = seCtx._threadingModel;
+ _canUseReserved = seCtx._canUseReserved;
+ return *this;
+ }
/**
* Set the ThreadingModel for the associated Client's service execution.
*
* This function is only valid to invoke with the Client lock or before the Client is set.
*/
- ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept;
+ void setThreadingModel(ThreadingModel threadingModel) noexcept;
/**
* Set if reserved resources are available for the associated Client's service execution.
*
* This function is only valid to invoke with the Client lock or before the Client is set.
*/
- ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept;
+ void setCanUseReserved(bool canUseReserved) noexcept;
/**
* Get the ThreadingModel for the associated Client.
@@ -177,7 +199,7 @@ public:
* This function is only valid to invoke from the associated Client thread. This function does
* not require the Client lock since all writes must also happen from that thread.
*/
- ServiceExecutor* getServiceExecutor() const noexcept;
+ ServiceExecutor* getServiceExecutor() noexcept;
private:
friend StringData toString(ThreadingModel threadingModel);
@@ -187,8 +209,31 @@ private:
ThreadingModel _threadingModel = ThreadingModel::kDedicated;
bool _canUseReserved = false;
+ bool _hasUsedSynchronous = false;
};
+/**
+ * A small statlet for tracking which executors may be in use.
+ */
+class ServiceExecutorStats {
+public:
+ /**
+ * Get the current value of ServiceExecutorStats for the given ServiceContext.
+ *
+ * Note that this value is intended for statistics and logging. It is unsynchronized and
+ * unsuitable for informing decisions in runtime.
+ */
+ static ServiceExecutorStats get(ServiceContext* ctx) noexcept;
+
+ // The number of Clients who use the dedicated executors.
+ size_t usesDedicated = 0;
+
+ // The number of Clients who use the borrowed executors.
+ size_t usesBorrowed = 0;
+
+ // The number of Clients that are allowed to ignore maxConns and use reserved resources.
+ size_t limitExempt = 0;
+};
} // namespace transport
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 5124542291b..e9fa1ee95a6 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -163,9 +163,6 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
}
} // namespace
-using transport::ServiceExecutor;
-using transport::TransportLayer;
-
/*
* This class wraps up the logic for swapping/unswapping the Client when transitioning
* between states.
@@ -178,115 +175,83 @@ class ServiceStateMachine::ThreadGuard {
public:
explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {
- auto owned = Ownership::kUnowned;
- _ssm->_owned.compareAndSwap(&owned, Ownership::kOwned);
- if (owned == Ownership::kStatic) {
- dassert(haveClient());
- dassert(Client::getCurrent() == _ssm->_dbClientPtr);
- _haveTakenOwnership = true;
+ invariant(_ssm);
+
+ if (_ssm->_clientPtr == Client::getCurrent()) {
+ // We're not the first on this thread, nothing more to do.
return;
}
-#ifdef MONGO_CONFIG_DEBUG_BUILD
- invariant(owned == Ownership::kUnowned);
- _ssm->_owningThread.store(stdx::this_thread::get_id());
-#endif
+ auto& client = _ssm->_client;
+ invariant(client);
// Set up the thread name
auto oldThreadName = getThreadName();
- const auto& threadName = _ssm->_dbClient->desc();
+ const auto& threadName = client->desc();
if (oldThreadName != threadName) {
_oldThreadName = oldThreadName.toString();
setThreadName(threadName);
}
// Swap the current Client so calls to cc() work as expected
- Client::setCurrent(std::move(_ssm->_dbClient));
+ Client::setCurrent(std::move(client));
_haveTakenOwnership = true;
}
// Constructing from a moved ThreadGuard invalidates the other thread guard.
ThreadGuard(ThreadGuard&& other)
- : _ssm(other._ssm), _haveTakenOwnership(other._haveTakenOwnership) {
- other._haveTakenOwnership = false;
- }
+ : _ssm{std::exchange(other._ssm, nullptr)},
+ _haveTakenOwnership{std::exchange(_haveTakenOwnership, false)} {}
ThreadGuard& operator=(ThreadGuard&& other) {
- if (this != &other) {
- _ssm = other._ssm;
- _haveTakenOwnership = other._haveTakenOwnership;
- other._haveTakenOwnership = false;
- }
+ _ssm = std::exchange(other._ssm, nullptr);
+ _haveTakenOwnership = std::exchange(other._haveTakenOwnership, false);
return *this;
};
ThreadGuard() = delete;
~ThreadGuard() {
- if (_haveTakenOwnership)
- release();
+ release();
}
explicit operator bool() const {
-#ifdef MONGO_CONFIG_DEBUG_BUILD
- if (_haveTakenOwnership) {
- invariant(_ssm->_owned.load() != Ownership::kUnowned);
- invariant(_ssm->_owningThread.load() == stdx::this_thread::get_id());
- return true;
- } else {
- return false;
- }
-#else
- return _haveTakenOwnership;
-#endif
- }
-
- void markStaticOwnership() {
- dassert(static_cast<bool>(*this));
- _ssm->_owned.store(Ownership::kStatic);
+ return _ssm;
}
void release() {
- auto owned = _ssm->_owned.load();
-
-#ifdef MONGO_CONFIG_DEBUG_BUILD
- dassert(_haveTakenOwnership);
- dassert(owned != Ownership::kUnowned);
- dassert(_ssm->_owningThread.load() == stdx::this_thread::get_id());
-#endif
- if (owned != Ownership::kStatic) {
- if (haveClient()) {
- _ssm->_dbClient = Client::releaseCurrent();
- }
-
- if (!_oldThreadName.empty()) {
- setThreadName(_oldThreadName);
- }
+ if (!_ssm) {
+ // We've been released or moved from.
+ return;
}
- // If the session has ended, then it's unsafe to do anything but call the cleanup hook.
- if (_ssm->state() == State::Ended) {
- // The cleanup hook gets moved out of _ssm->_cleanupHook so that it can only be called
- // once.
- auto cleanupHook = std::move(_ssm->_cleanupHook);
- if (cleanupHook)
- cleanupHook();
+ // If we have a ServiceStateMachine pointer, then it should control the current Client.
+ invariant(_ssm->_clientPtr == Client::getCurrent());
+
+ if (auto haveTakenOwnership = std::exchange(_haveTakenOwnership, false);
+ !haveTakenOwnership) {
+ // Reset our pointer so that we cannot release again.
+ _ssm = nullptr;
- // It's very important that the Guard returns here and that the SSM's state does not
- // get modified in any way after the cleanup hook is called.
+ // We are not the original owner, nothing more to do.
return;
}
- _haveTakenOwnership = false;
- // If owned != Ownership::kOwned here then it can only equal Ownership::kStatic and we
- // should just return
- if (owned == Ownership::kOwned) {
- _ssm->_owned.store(Ownership::kUnowned);
+ // Reclaim the client.
+ _ssm->_client = Client::releaseCurrent();
+
+ // Reset our pointer so that we cannot release again.
+ _ssm = nullptr;
+
+ if (!_oldThreadName.empty()) {
+ // Reset the old thread name.
+ setThreadName(_oldThreadName);
}
}
private:
- ServiceStateMachine* _ssm;
+ ServiceStateMachine* _ssm = nullptr;
+
bool _haveTakenOwnership = false;
std::string _oldThreadName;
};
@@ -295,22 +260,23 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)
: _state{State::Created},
_serviceContext{client->getServiceContext()},
_sep{_serviceContext->getServiceEntryPoint()},
- _dbClient{std::move(client)},
- _dbClientPtr{_dbClient.get()} {}
+ _client{std::move(client)},
+ _clientPtr{_client.get()} {}
-const transport::SessionHandle& ServiceStateMachine::_session() const {
- return _dbClientPtr->session();
+const transport::SessionHandle& ServiceStateMachine::_session() {
+ return _clientPtr->session();
}
ServiceExecutor* ServiceStateMachine::_executor() {
- return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor();
+ return ServiceExecutorContext::get(_clientPtr)->getServiceExecutor();
}
-Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
+Future<void> ServiceStateMachine::_sourceMessage() {
+ auto guard = ThreadGuard(this);
+
invariant(_inMessage.empty());
invariant(_state.load() == State::Source);
_state.store(State::SourceWait);
- guard.release();
auto sourceMsgImpl = [&] {
const auto& transportMode = _executor()->transportMode();
@@ -333,11 +299,12 @@ Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
});
}
-Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) {
+Future<void> ServiceStateMachine::_sinkMessage() {
+ auto guard = ThreadGuard(this);
+
// Sink our response to the client
invariant(_state.load() == State::Process);
_state.store(State::SinkWait);
- guard.release();
auto toSink = std::exchange(_outMessage, {});
auto sinkMsgImpl = [&] {
@@ -360,12 +327,10 @@ Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) {
}
void ServiceStateMachine::_sourceCallback(Status status) {
- // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
- // thread.
- ThreadGuard guard(this);
+ auto guard = ThreadGuard(this);
+
+ invariant(state() == State::SourceWait);
- // Make sure we just called sourceMessage();
- dassert(state() == State::SourceWait);
auto remote = _session()->remote();
if (status.isOK()) {
@@ -405,11 +370,9 @@ void ServiceStateMachine::_sourceCallback(Status status) {
}
void ServiceStateMachine::_sinkCallback(Status status) {
- // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this
- // thread.
- ThreadGuard guard(this);
+ auto guard = ThreadGuard(this);
- dassert(state() == State::SinkWait);
+ invariant(state() == State::SinkWait);
// If there was an error sinking the message to the client, then we should print an error and
// end the session.
@@ -431,7 +394,9 @@ void ServiceStateMachine::_sinkCallback(Status status) {
}
}
-Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
+Future<void> ServiceStateMachine::_processMessage() {
+ auto guard = ThreadGuard(this);
+
invariant(!_inMessage.empty());
TrafficRecorder::get(_serviceContext)
@@ -459,10 +424,10 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
// database work for this request.
return _sep->handleRequest(opCtx.get(), _inMessage)
- .then([this,
- &compressorMgr = compressorMgr,
- opCtx = std::move(opCtx),
- guard = std::move(guard)](DbResponse dbresponse) mutable -> void {
+ .then([this, &compressorMgr = compressorMgr, opCtx = std::move(opCtx)](
+ DbResponse dbresponse) mutable -> void {
+ auto guard = ThreadGuard(this);
+
// opCtx must be killed and delisted here so that the operation cannot show up in
// currentOp results after the response reaches the client. The destruction is postponed
// for later to mitigate its performance impact on the critical path of execution.
@@ -519,14 +484,15 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
});
}
-void ServiceStateMachine::start() {
+void ServiceStateMachine::start(ServiceExecutorContext seCtx) {
+ {
+ stdx::lock_guard lk(*_clientPtr);
+
+ ServiceExecutorContext::set(_clientPtr, std::move(seCtx));
+ }
+
_executor()->schedule(
GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
- // TODO(SERVER-49109) We can't use static ownership in general with
- // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter.
- ThreadGuard guard(shared_from_this().get());
- guard.markStaticOwnership();
-
// If this is the first run of the SSM, then update its state to Source
if (state() == State::Created) {
_state.store(State::Source);
@@ -541,16 +507,16 @@ void ServiceStateMachine::_runOnce() {
if (_inExhaust) {
return Status::OK();
} else {
- return _sourceMessage(ThreadGuard(this));
+ return _sourceMessage();
}
})
- .then([this]() { return _processMessage(ThreadGuard(this)); })
+ .then([this]() { return _processMessage(); })
.then([this]() -> Future<void> {
if (_outMessage.empty()) {
return Status::OK();
}
- return _sinkMessage(ThreadGuard(this));
+ return _sinkMessage();
})
.getAsync([this, anchor = shared_from_this()](Status status) {
// Destroy the opCtx (already killed) here, to potentially use the delay between
@@ -560,9 +526,9 @@ void ServiceStateMachine::_runOnce() {
}
if (!status.isOK()) {
_state.store(State::EndSession);
- // The service executor failed to schedule the task. This could for example be that
- // we failed to start a worker thread. Terminate this connection to leave the system
- // in a valid state.
+ // The service executor failed to schedule the task. This could for example be
+ // that we failed to start a worker thread. Terminate this connection to leave
+ // the system in a valid state.
LOGV2_WARNING_OPTIONS(4910400,
{logv2::LogComponent::kExecutor},
"Terminating session due to error: {error}",
@@ -570,8 +536,8 @@ void ServiceStateMachine::_runOnce() {
"error"_attr = status);
terminate();
- ThreadGuard terminateGuard(this);
- _cleanupSession(std::move(terminateGuard));
+ _executor()->schedule(GuaranteedExecutor::enforceRunOnce(
+ [this, anchor = shared_from_this()](Status status) { _cleanupSession(); }));
return;
}
@@ -593,8 +559,8 @@ void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask t
auto sessionTags = _session()->getTags();
- // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have been
- // set, then skip the termination check.
+ // If terminateIfTagsDontMatch gets called when we still are 'pending' where no tags have
+ // been set, then skip the termination check.
if ((sessionTags & tags) || (sessionTags & transport::Session::kPending)) {
LOGV2(22991,
"Skip closing connection for connection",
@@ -646,7 +612,9 @@ void ServiceStateMachine::_cleanupExhaustResources() noexcept try {
"error"_attr = e.toStatus());
}
-void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {
+void ServiceStateMachine::_cleanupSession() {
+ auto guard = ThreadGuard(this);
+
// Ensure the delayed destruction of opCtx always happens before doing the cleanup.
if (MONGO_likely(_killedOpCtx)) {
_killedOpCtx.reset();
@@ -655,15 +623,20 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {
_cleanupExhaustResources();
+ {
+ stdx::lock_guard lk(*_clientPtr);
+ transport::ServiceExecutorContext::reset(_clientPtr);
+ }
+
+ if (auto cleanupHook = std::exchange(_cleanupHook, {})) {
+ cleanupHook();
+ }
+
_state.store(State::Ended);
_inMessage.reset();
_outMessage.reset();
-
- // By ignoring the return value of Client::releaseCurrent() we destroy the session.
- // _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed.
- Client::releaseCurrent();
}
} // namespace transport
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 7794cdfbfce..2c649442fee 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -103,7 +103,7 @@ public:
/*
* start() schedules a call to _runOnce() in the future.
*/
- void start();
+ void start(ServiceExecutorContext seCtx);
/*
* Gets the current state of connection for testing/diagnostic purposes.
@@ -137,7 +137,6 @@ private:
* each step in _runOnce();
*/
class ThreadGuard;
- friend class ThreadGuard;
/*
* Terminates the associated transport Session if status indicate error.
@@ -147,19 +146,9 @@ private:
void _terminateAndLogIfError(Status status);
/*
- * This is a helper function to schedule tasks on the serviceExecutor maintaining a shared_ptr
- * copy to anchor the lifetime of the SSM while waiting for callbacks to run.
- *
- * If scheduling the function fails, the SSM will be terminated and cleaned up immediately
- */
- void _scheduleNextWithGuard(ThreadGuard guard,
- transport::ServiceExecutor::ScheduleFlags flags,
- Ownership ownershipModel = Ownership::kOwned);
-
- /*
* Gets the transport::Session associated with this connection
*/
- const transport::SessionHandle& _session() const;
+ const transport::SessionHandle& _session();
/*
* Gets the transport::ServiceExecutor associated with this connection.
@@ -170,7 +159,7 @@ private:
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
- Future<void> _processMessage(ThreadGuard guard);
+ Future<void> _processMessage();
/*
* These get called by the TransportLayer when requested network I/O has completed.
@@ -182,13 +171,13 @@ private:
* Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
* before waiting on the TL.
*/
- Future<void> _sourceMessage(ThreadGuard guard);
- Future<void> _sinkMessage(ThreadGuard guard);
+ Future<void> _sourceMessage();
+ Future<void> _sinkMessage();
/*
* Releases all the resources associated with the session and call the cleanupHook.
*/
- void _cleanupSession(ThreadGuard guard);
+ void _cleanupSession();
/*
* This is the initial function called at the beginning of a thread's lifecycle in the
@@ -207,8 +196,8 @@ private:
ServiceEntryPoint* const _sep;
transport::SessionHandle _sessionHandle;
- ServiceContext::UniqueClient _dbClient;
- Client* _dbClientPtr;
+ ServiceContext::UniqueClient _client;
+ Client* _clientPtr;
std::function<void()> _cleanupHook;
bool _inExhaust = false;