summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-09-11 20:09:59 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-14 04:57:01 +0000
commit827a25eb01bc5ddf766b3a543ef0ba5112953e1b (patch)
tree504935db5b1292818dcd9b862bf8b5c3b4352fb5 /src/mongo/transport
parentbd320bc2d10cff75756a2c95986cc81ec8a5e7c7 (diff)
downloadmongo-827a25eb01bc5ddf766b3a543ef0ba5112953e1b.tar.gz
SERVER-50867 Roll back ServiceStateMachine changes temporarily
This reverts these commits: b039b24746e1d1fb10a32e1ca4831423c01d4cd7: SERVER-48980 97e16187ff3065d242a61a52e7b6edd4d439fb30: SERVER-49072 0607a6c291bf4cf4580a4444d826ed3c3ac3df47: SERVER-49104
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/SConscript4
-rw-r--r--src/mongo/transport/service_entry_point.h9
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp163
-rw-r--r--src/mongo/transport/service_entry_point_impl.h12
-rw-r--r--src/mongo/transport/service_executor.cpp135
-rw-r--r--src/mongo/transport/service_executor.h77
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp16
-rw-r--r--src/mongo/transport/service_executor_fixed.h3
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp25
-rw-r--r--src/mongo/transport/service_executor_reserved.h3
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp15
-rw-r--r--src/mongo/transport/service_executor_synchronous.h3
-rw-r--r--src/mongo/transport/service_state_machine.cpp251
-rw-r--r--src/mongo/transport/service_state_machine.h74
-rw-r--r--src/mongo/transport/transport_layer_manager.cpp2
15 files changed, 259 insertions, 533 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 7932abc662d..5326afe5db2 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -86,7 +86,6 @@ env.Library(
tlEnv.Library(
target='service_executor',
source=[
- 'service_executor.cpp',
'service_executor_fixed.cpp',
'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
@@ -181,8 +180,7 @@ tlEnv.CppUnitTest(
'transport_layer_asio_test.cpp',
'service_executor_test.cpp',
'max_conns_override_test.cpp',
- # TODO: service_state_machine test to be re-written in SERVER-50141
- # 'service_state_machine_test.cpp',
+ 'service_state_machine_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index fb1ad895d78..aa970f1ef67 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -29,8 +29,6 @@
#pragma once
-#include <limits>
-
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
@@ -83,13 +81,6 @@ public:
virtual size_t numOpenSessions() const = 0;
/**
- * Returns the maximum number of sessions that can be open.
- */
- virtual size_t maxOpenSessions() const {
- return std::numeric_limits<size_t>::max();
- }
-
- /**
* Processes a request and fills out a DbResponse.
*/
virtual Future<DbResponse> handleRequest(OperationContext* opCtx,
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 0090f30a686..1824ccf1c94 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -33,7 +33,6 @@
#include "mongo/transport/service_entry_point_impl.h"
-#include <fmt/format.h>
#include <vector>
#include "mongo/db/auth/restriction_environment.h"
@@ -49,23 +48,12 @@
#include <sys/resource.h>
#endif
-#if !defined(__has_feature)
-#define __has_feature(x) 0
-#endif
-
namespace mongo {
-using namespace fmt::literals;
-
bool shouldOverrideMaxConns(const transport::SessionHandle& session,
const std::vector<stdx::variant<CIDR, std::string>>& exemptions) {
- if (exemptions.empty()) {
- return false;
- }
-
const auto& remoteAddr = session->remoteAddr();
const auto& localAddr = session->localAddr();
-
boost::optional<CIDR> remoteCIDR;
if (remoteAddr.isValid() && remoteAddr.isIP()) {
@@ -93,7 +81,8 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session,
return false;
}
-size_t getSupportedMax() {
+ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {
+
const auto supportedMax = [] {
#ifdef _WIN32
return serverGlobalParams.maxConns;
@@ -124,33 +113,21 @@ size_t getSupportedMax() {
"limit"_attr = supportedMax);
}
- return supportedMax;
-}
-
-ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx)
- : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {}
+ _maxNumConnections = supportedMax;
-Status ServiceEntryPointImpl::start() {
- if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
- !status.isOK()) {
- return status;
+ if (serverGlobalParams.reservedAdminThreads) {
+ _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
+ _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
}
+}
- if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
- if (auto status = exec->start(); !status.isOK()) {
- return status;
- }
- }
-
- // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
- // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
- // return status;
- // }
-
- return Status::OK();
+Status ServiceEntryPointImpl::start() {
+ if (_adminInternalPool)
+ return _adminInternalPool->start();
+ else
+ return Status::OK();
}
-// TODO: explicitly start on the fixed executor
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
const auto& remoteAddr = session->remoteAddr();
@@ -159,48 +136,43 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr);
RestrictionEnvironment::set(session, std::move(restrictionEnvironment));
- bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
-
- auto clientName = "conn{}"_format(session->id());
- auto client = _svcCtx->makeClient(clientName, session);
-
- {
- stdx::lock_guard lk(*client);
- auto seCtx =
- transport::ServiceExecutorContext{}
- .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated)
- .setCanUseReserved(canOverrideMaxConns);
-
- transport::ServiceExecutorContext::set(client.get(), std::move(seCtx));
- }
-
- auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client));
+ SSMListIterator ssmIt;
const bool quiet = serverGlobalParams.quiet.load();
-
size_t connectionCount;
- auto ssmIt = [&]() -> boost::optional<SSMListIterator> {
- stdx::lock_guard lk(_sessionsMutex);
- connectionCount = _currentConnections.load();
- if (connectionCount > _maxNumConnections && !canOverrideMaxConns) {
- return boost::none;
+ auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
+
+ auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
+ auto usingMaxConnOverride = false;
+ {
+ stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
+ connectionCount = _sessions.size() + 1;
+ if (connectionCount > _maxNumConnections) {
+ usingMaxConnOverride =
+ shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
}
- auto it = _sessions.emplace(_sessions.begin(), ssm);
- connectionCount = _sessions.size();
- _currentConnections.store(connectionCount);
- _createdConnections.addAndFetch(1);
- return it;
- }();
+ if (connectionCount <= _maxNumConnections || usingMaxConnOverride) {
+ ssmIt = _sessions.emplace(_sessions.begin(), ssm);
+ _currentConnections.store(connectionCount);
+ _createdConnections.addAndFetch(1);
+ }
+ }
- if (!ssmIt) {
+ // Checking if we successfully added a connection above. Separated from the lock so we don't log
+ // while holding it.
+ if (connectionCount > _maxNumConnections && !usingMaxConnOverride) {
if (!quiet) {
LOGV2(22942,
"Connection refused because there are too many open connections",
"connectionCount"_attr = connectionCount);
}
return;
- } else if (!quiet) {
+ } else if (usingMaxConnOverride && _adminInternalPool) {
+ ssm->setServiceExecutor(_adminInternalPool.get());
+ }
+
+ if (!quiet) {
LOGV2(22943,
"Connection accepted",
"remote"_attr = session->remote(),
@@ -213,7 +185,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto remote = session->remote();
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
- _sessions.erase(*ssmIt);
+ _sessions.erase(ssmIt);
connectionCount = _sessions.size();
_currentConnections.store(connectionCount);
}
@@ -228,7 +200,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
}
});
- ssm->start();
+ auto ownership = ServiceStateMachine::Ownership::kOwned;
+ if (transportMode == transport::Mode::kSynchronous) {
+ ownership = ServiceStateMachine::Ownership::kStatic;
+ }
+ ssm->start(ownership);
}
void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
@@ -243,13 +219,8 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
}
bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
-#if __has_feature(address_sanitizer)
- // When running under address sanitizer, we get false positive leaks due to disorder around
- // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
- // harder to dry up the server from active connections before going on to really shut down.
using logv2::LogComponent;
- auto start = _svcCtx->getPreciseClockSource()->now();
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
// Request that all sessions end, while holding the _sesionsMutex, loop over all the current
@@ -286,37 +257,7 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
"shutdown: exhausted grace period active workers to drain; continuing with shutdown...",
"workers"_attr = numOpenSessions());
}
-
- lk.unlock();
-
- // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
- // timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- // timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
- // !status.isOK()) {
- // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
- // }
-
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
- if (auto status = exec->shutdown(timeout); !status.isOK()) {
- LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
- }
- }
-
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto status =
- transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
- !status.isOK()) {
- LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
- }
-
return result;
-#else
- return true;
-#endif
}
void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
@@ -326,17 +267,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
bob->append("current", static_cast<int>(sessionCount));
bob->append("available", static_cast<int>(_maxNumConnections - sessionCount));
bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
+ if (auto sc = getGlobalServiceContext()) {
+ bob->append("active", static_cast<int>(sc->getActiveClientOperations()));
+ bob->append("exhaustIsMaster",
+ static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster()));
+ bob->append("awaitingTopologyChanges",
+ static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges()));
+ }
- invariant(_svcCtx);
- bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations()));
- bob->append("exhaustIsMaster",
- static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster()));
- bob->append("awaitingTopologyChanges",
- static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges()));
-
- if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) {
+ if (_adminInternalPool) {
BSONObjBuilder section(bob->subobjStart("adminConnections"));
- adminExec->appendStats(&section);
+ _adminInternalPool->appendStats(&section);
}
}
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 1693469fcda..38c1319657a 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -36,9 +36,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/variant.h"
#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_reserved.h"
-#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/util/hierarchical_acquisition.h"
#include "mongo/util/net/cidr.h"
@@ -77,12 +75,8 @@ public:
return _currentConnections.load();
}
- size_t maxOpenSessions() const final {
- return _maxNumConnections;
- }
-
private:
- using SSMList = std::list<std::shared_ptr<transport::ServiceStateMachine>>;
+ using SSMList = std::list<std::shared_ptr<ServiceStateMachine>>;
using SSMListIterator = SSMList::iterator;
ServiceContext* const _svcCtx;
@@ -93,9 +87,11 @@ private:
stdx::condition_variable _shutdownCondition;
SSMList _sessions;
- const size_t _maxNumConnections{DEFAULT_MAX_CONN};
+ size_t _maxNumConnections{DEFAULT_MAX_CONN};
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};
+
+ std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool;
};
/*
diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp
deleted file mode 100644
index 37c3d03e1b1..00000000000
--- a/src/mongo/transport/service_executor.cpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_executor.h"
-
-#include <boost/optional.hpp>
-
-#include "mongo/logv2/log.h"
-#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/service_executor_fixed.h"
-#include "mongo/transport/service_executor_reserved.h"
-#include "mongo/transport/service_executor_synchronous.h"
-
-namespace mongo {
-namespace transport {
-namespace {
-static constexpr auto kDiagnosticLogLevel = 4;
-
-auto getServiceExecutorContext =
- Client::declareDecoration<boost::optional<ServiceExecutorContext>>();
-} // namespace
-
-StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) {
- switch (threadingModel) {
- case ServiceExecutorContext::ThreadingModel::kDedicated:
- return "Dedicated"_sd;
- case ServiceExecutorContext::ThreadingModel::kBorrowed:
- return "Borrowed"_sd;
- default:
- MONGO_UNREACHABLE;
- }
-}
-
-ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept {
- auto& serviceExecutorContext = getServiceExecutorContext(client);
-
- if (!serviceExecutorContext) {
- // Service worker Clients will never have a ServiceExecutorContext.
- return nullptr;
- }
-
- return &serviceExecutorContext.get();
-}
-
-void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) noexcept {
- auto& serviceExecutorContext = getServiceExecutorContext(client);
- invariant(!serviceExecutorContext);
-
- seCtx._client = client;
- seCtx._sep = client->getServiceContext()->getServiceEntryPoint();
-
- LOGV2_DEBUG(4898000,
- kDiagnosticLogLevel,
- "Setting initial ServiceExecutor context for client",
- "client"_attr = client->desc(),
- "threadingModel"_attr = seCtx._threadingModel,
- "canUseReserved"_attr = seCtx._canUseReserved);
- serviceExecutorContext = std::move(seCtx);
-}
-
-ServiceExecutorContext& ServiceExecutorContext::setThreadingModel(
- ThreadingModel threadingModel) noexcept {
- _threadingModel = threadingModel;
- return *this;
-}
-
-ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept {
- _canUseReserved = canUseReserved;
- return *this;
-}
-
-ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept {
- invariant(_client);
-
- switch (_threadingModel) {
- case ThreadingModel::kBorrowed:
- return ServiceExecutorFixed::get(_client->getServiceContext());
- case ThreadingModel::kDedicated: {
- // Continue on.
- } break;
- default:
- MONGO_UNREACHABLE;
- }
-
- auto shouldUseReserved = [&] {
- // This is at best a naive solution. There could be a world where numOpenSessions() changes
- // very quickly. We are not taking locks on the ServiceEntryPoint, so we may chose to
- // schedule onto the ServiceExecutorReserved when it is no longer necessary. The upside is
- // that we will automatically shift to the ServiceExecutorSynchronous after the first
- // command loop.
- return _sep->numOpenSessions() > _sep->maxOpenSessions();
- };
-
- if (_canUseReserved && shouldUseReserved()) {
- if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) {
- // We are allowed to use the reserved executor, we should use it, and it exists.
- return exec;
- }
- }
-
- return transport::ServiceExecutorSynchronous::get(_client->getServiceContext());
-}
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index 38116908272..b702198e6b5 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -33,19 +33,20 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/client.h"
-#include "mongo/db/service_context.h"
#include "mongo/platform/bitwise_enum_operators.h"
-#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/session.h"
#include "mongo/transport/transport_mode.h"
#include "mongo/util/duration.h"
#include "mongo/util/functional.h"
#include "mongo/util/out_of_line_executor.h"
namespace mongo {
+// This needs to be forward declared here because the service_context.h is a circular dependency.
+class ServiceContext;
+
namespace transport {
+class Session;
+
/*
* This is the interface for all ServiceExecutors.
*/
@@ -122,74 +123,6 @@ public:
virtual void appendStats(BSONObjBuilder* bob) const = 0;
};
-/**
- * ServiceExecutorContext determines which ServiceExecutor is used for each Client.
- */
-class ServiceExecutorContext {
-public:
- enum ThreadingModel {
- kBorrowed,
- kDedicated,
- };
-
- /**
- * Get a pointer to the ServiceExecutorContext for a given client.
- *
- * This function is valid to invoke either on the Client thread or with the Client lock.
- */
- static ServiceExecutorContext* get(Client* client) noexcept;
-
- /**
- * Set the ServiceExecutorContext for a given client.
- *
- * This function may only be invoked once and only while under the Client lock.
- */
- static void set(Client* client, ServiceExecutorContext seCtx) noexcept;
-
- ServiceExecutorContext() = default;
-
- /**
- * Set the ThreadingModel for the associated Client's service execution.
- *
- * This function is only valid to invoke with the Client lock or before the Client is set.
- */
- ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept;
-
- /**
- * Set if reserved resources are available for the associated Client's service execution.
- *
- * This function is only valid to invoke with the Client lock or before the Client is set.
- */
- ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept;
-
- /**
- * Get the ThreadingModel for the associated Client.
- *
- * This function is valid to invoke either on the Client thread or with the Client lock.
- */
- auto getThreadingModel() const noexcept {
- return _threadingModel;
- }
-
- /**
- * Get an appropriate ServiceExecutor given the current parameters.
- *
- * This function is only valid to invoke from the associated Client thread. This function does
- * not require the Client lock since all writes must also happen from that thread.
- */
- ServiceExecutor* getServiceExecutor() const noexcept;
-
-private:
- friend StringData toString(ThreadingModel threadingModel);
-
- Client* _client = nullptr;
- ServiceEntryPoint* _sep = nullptr;
-
- ThreadingModel _threadingModel = ThreadingModel::kDedicated;
- bool _canUseReserved = false;
-};
-
-
} // namespace transport
ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags)
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index b068e487126..2a1fc737cc1 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -37,7 +37,6 @@
#include "mongo/transport/session.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
-#include "mongo/util/thread_safety_context.h"
namespace mongo {
@@ -50,15 +49,6 @@ namespace {
constexpr auto kThreadsRunning = "threadsRunning"_sd;
constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "fixed"_sd;
-
-const auto getServiceExecutorFixed =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorFixed>>();
-
-const auto serviceExecutorFixedRegisterer = ServiceContext::ConstructorActionRegisterer{
- "ServiceExecutorFixed", [](ServiceContext* ctx) {
- getServiceExecutorFixed(ctx) =
- std::make_unique<ServiceExecutorFixed>(ThreadPool::Options{});
- }};
} // namespace
ServiceExecutorFixed::ServiceExecutorFixed(ThreadPool::Options options)
@@ -96,12 +86,6 @@ Status ServiceExecutorFixed::start() {
return Status::OK();
}
-ServiceExecutorFixed* ServiceExecutorFixed::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorFixed(ctx);
- invariant(ref);
- return ref.get();
-}
-
Status ServiceExecutorFixed::shutdown(Milliseconds timeout) {
auto waitForShutdown = [&]() mutable -> Status {
stdx::unique_lock<Latch> lk(_mutex);
diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h
index 6c540576a78..82be057e1f5 100644
--- a/src/mongo/transport/service_executor_fixed.h
+++ b/src/mongo/transport/service_executor_fixed.h
@@ -33,7 +33,6 @@
#include <memory>
#include "mongo/base/status.h"
-#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -56,8 +55,6 @@ public:
explicit ServiceExecutorFixed(ThreadPool::Options options);
virtual ~ServiceExecutorFixed();
- static ServiceExecutorFixed* get(ServiceContext* ctx);
-
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
index d81ad5be200..cae9653e60f 100644
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -33,13 +33,11 @@
#include "mongo/transport/service_executor_reserved.h"
-#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_utils.h"
#include "mongo/util/processinfo.h"
-#include "mongo/util/thread_safety_context.h"
namespace mongo {
namespace transport {
@@ -50,19 +48,6 @@ constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "reserved"_sd;
constexpr auto kReadyThreads = "readyThreads"_sd;
constexpr auto kStartingThreads = "startingThreads"_sd;
-
-const auto getServiceExecutorReserved =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorReserved>>();
-
-const auto serviceExecutorReservedRegisterer = ServiceContext::ConstructorActionRegisterer{
- "ServiceExecutorReserved", [](ServiceContext* ctx) {
- if (!serverGlobalParams.reservedAdminThreads) {
- return;
- }
-
- getServiceExecutorReserved(ctx) = std::make_unique<transport::ServiceExecutorReserved>(
- ctx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
- }};
} // namespace
thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {};
@@ -162,12 +147,6 @@ Status ServiceExecutorReserved::_startWorker() {
});
}
-ServiceExecutorReserved* ServiceExecutorReserved::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorReserved(ctx);
-
- // The ServiceExecutorReserved could be absent, so nullptr is okay.
- return ref.get();
-}
Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
LOGV2_DEBUG(22980, 3, "Shutting down reserved executor");
@@ -194,8 +173,8 @@ Status ServiceExecutorReserved::scheduleTask(Task task, ScheduleFlags flags) {
if (!_localWorkQueue.empty()) {
// Execute task directly (recurse) if allowed by the caller as it produced better
// performance in testing. Try to limit the amount of recursion so we don't blow up the
- // stack, even though this shouldn't happen with this executor that uses blocking
- // network I/O.
+ // stack, even though this shouldn't happen with this executor that uses blocking network
+ // I/O.
if ((flags & ScheduleFlags::kMayRecurse) &&
(_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
++_localRecursionDepth;
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
index 60de4bc2993..e3acf6febd9 100644
--- a/src/mongo/transport/service_executor_reserved.h
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -32,7 +32,6 @@
#include <deque>
#include "mongo/base/status.h"
-#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -55,8 +54,6 @@ class ServiceExecutorReserved final : public ServiceExecutor {
public:
explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
- static ServiceExecutorReserved* get(ServiceContext* ctx);
-
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index c75bdbb0952..d034e208954 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -38,7 +38,6 @@
#include "mongo/transport/service_executor_gen.h"
#include "mongo/transport/service_executor_utils.h"
#include "mongo/util/processinfo.h"
-#include "mongo/util/thread_safety_context.h"
namespace mongo {
namespace transport {
@@ -46,14 +45,6 @@ namespace {
constexpr auto kThreadsRunning = "threadsRunning"_sd;
constexpr auto kExecutorLabel = "executor"_sd;
constexpr auto kExecutorName = "passthrough"_sd;
-
-const auto getServiceExecutorSynchronous =
- ServiceContext::declareDecoration<std::unique_ptr<ServiceExecutorSynchronous>>();
-
-const auto serviceExecutorSynchronousRegisterer = ServiceContext::ConstructorActionRegisterer{
- "ServiceExecutorSynchronous", [](ServiceContext* ctx) {
- getServiceExecutorSynchronous(ctx) = std::make_unique<ServiceExecutorSynchronous>(ctx);
- }};
} // namespace
thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {};
@@ -87,12 +78,6 @@ Status ServiceExecutorSynchronous::shutdown(Milliseconds timeout) {
"passthrough executor couldn't shutdown all worker threads within time limit.");
}
-ServiceExecutorSynchronous* ServiceExecutorSynchronous::get(ServiceContext* ctx) {
- auto& ref = getServiceExecutorSynchronous(ctx);
- invariant(ref);
- return ref.get();
-}
-
Status ServiceExecutorSynchronous::scheduleTask(Task task, ScheduleFlags flags) {
if (!_stillRunning.load()) {
return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h
index 840dc702d4c..940382b53e8 100644
--- a/src/mongo/transport/service_executor_synchronous.h
+++ b/src/mongo/transport/service_executor_synchronous.h
@@ -32,7 +32,6 @@
#include <deque>
#include "mongo/base/status.h"
-#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -50,8 +49,6 @@ class ServiceExecutorSynchronous final : public ServiceExecutor {
public:
explicit ServiceExecutorSynchronous(ServiceContext* ctx);
- static ServiceExecutorSynchronous* get(ServiceContext* ctx);
-
Status start() override;
Status shutdown(Milliseconds timeout) override;
Status scheduleTask(Task task, ScheduleFlags flags) override;
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 5124542291b..373a362d0df 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -36,6 +36,7 @@
#include <memory>
#include "mongo/config.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/traffic_recorder.h"
@@ -44,8 +45,6 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/service_executor_fixed.h"
-#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/assert_util.h"
@@ -59,7 +58,6 @@
#include "mongo/util/quick_exit.h"
namespace mongo {
-namespace transport {
namespace {
MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome);
/**
@@ -161,14 +159,14 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
return exhaustMessage;
}
+
} // namespace
using transport::ServiceExecutor;
using transport::TransportLayer;
/*
- * This class wraps up the logic for swapping/unswapping the Client when transitioning
- * between states.
+ * This class wraps up the logic for swapping/unswapping the Client during runNext().
*
* In debug builds this also ensures that only one thread is working on the SSM at once.
*/
@@ -194,10 +192,9 @@ public:
// Set up the thread name
auto oldThreadName = getThreadName();
- const auto& threadName = _ssm->_dbClient->desc();
- if (oldThreadName != threadName) {
- _oldThreadName = oldThreadName.toString();
- setThreadName(threadName);
+ if (oldThreadName != _ssm->_threadName) {
+ _ssm->_oldThreadName = getThreadName().toString();
+ setThreadName(_ssm->_threadName);
}
// Swap the current Client so calls to cc() work as expected
@@ -259,8 +256,8 @@ public:
_ssm->_dbClient = Client::releaseCurrent();
}
- if (!_oldThreadName.empty()) {
- setThreadName(_oldThreadName);
+ if (!_ssm->_oldThreadName.empty()) {
+ setThreadName(_ssm->_oldThreadName);
}
}
@@ -288,75 +285,75 @@ public:
private:
ServiceStateMachine* _ssm;
bool _haveTakenOwnership = false;
- std::string _oldThreadName;
};
-ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)
+std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode) {
+ return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), transportMode);
+}
+
+ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode)
: _state{State::Created},
- _serviceContext{client->getServiceContext()},
- _sep{_serviceContext->getServiceEntryPoint()},
- _dbClient{std::move(client)},
+ _sep{svcContext->getServiceEntryPoint()},
+ _transportMode(transportMode),
+ _serviceContext(svcContext),
+ _serviceExecutor(_serviceContext->getServiceExecutor()),
+ _sessionHandle(session),
+ _threadName{str::stream() << "conn" << _session()->id()},
+ _dbClient{svcContext->makeClient(_threadName, std::move(session))},
_dbClientPtr{_dbClient.get()} {}
const transport::SessionHandle& ServiceStateMachine::_session() const {
- return _dbClientPtr->session();
+ return _sessionHandle;
}
-ServiceExecutor* ServiceStateMachine::_executor() {
- return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor();
-}
-
-Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
+void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
invariant(_inMessage.empty());
invariant(_state.load() == State::Source);
_state.store(State::SourceWait);
guard.release();
auto sourceMsgImpl = [&] {
- const auto& transportMode = _executor()->transportMode();
- if (transportMode == transport::Mode::kSynchronous) {
+ if (_transportMode == transport::Mode::kSynchronous) {
MONGO_IDLE_THREAD_BLOCK;
return Future<Message>::makeReady(_session()->sourceMessage());
} else {
- invariant(transportMode == transport::Mode::kAsynchronous);
+ invariant(_transportMode == transport::Mode::kAsynchronous);
return _session()->asyncSourceMessage();
}
};
- return sourceMsgImpl().onCompletion([this](StatusWith<Message> msg) -> Future<void> {
+ sourceMsgImpl().getAsync([this](StatusWith<Message> msg) {
if (msg.isOK()) {
_inMessage = std::move(msg.getValue());
invariant(!_inMessage.empty());
}
_sourceCallback(msg.getStatus());
- return Status::OK();
});
}
-Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) {
+void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {
// Sink our response to the client
invariant(_state.load() == State::Process);
_state.store(State::SinkWait);
guard.release();
- auto toSink = std::exchange(_outMessage, {});
auto sinkMsgImpl = [&] {
- const auto& transportMode = _executor()->transportMode();
- if (transportMode == transport::Mode::kSynchronous) {
+ if (_transportMode == transport::Mode::kSynchronous) {
// We don't consider ourselves idle while sending the reply since we are still doing
// work on behalf of the client. Contrast that with sourceMessage() where we are waiting
// for the client to send us more work to do.
return Future<void>::makeReady(_session()->sinkMessage(std::move(toSink)));
} else {
- invariant(transportMode == transport::Mode::kAsynchronous);
+ invariant(_transportMode == transport::Mode::kAsynchronous);
return _session()->asyncSinkMessage(std::move(toSink));
}
};
- return sinkMsgImpl().onCompletion([this](Status status) {
- _sinkCallback(std::move(status));
- return Status::OK();
- });
+ sinkMsgImpl().getAsync([this](Status status) { _sinkCallback(std::move(status)); });
}
void ServiceStateMachine::_sourceCallback(Status status) {
@@ -371,11 +368,14 @@ void ServiceStateMachine::_sourceCallback(Status status) {
if (status.isOK()) {
_state.store(State::Process);
- // If the sourceMessage succeeded then we can move to on to process the message. We
- // simply return from here and the future chain in _runOnce() will continue to the
- // next state normally.
+ // Since we know that we're going to process a message, call scheduleNext() immediately
+ // to schedule the call to processMessage() on the serviceExecutor (or just unwind the
+ // stack)
- // If any other issues arise, close the session.
+ // If this callback doesn't own the ThreadGuard, then we're being called recursively,
+ // and the executor shouldn't start a new thread to process the message - it can use this
+ // one just after this returns.
+ return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse);
} else if (ErrorCodes::isInterruption(status.code()) ||
ErrorCodes::isNetworkError(status.code())) {
LOGV2_DEBUG(
@@ -401,7 +401,10 @@ void ServiceStateMachine::_sourceCallback(Status status) {
"connectionId"_attr = _session()->id());
_state.store(State::EndSession);
}
- uassertStatusOK(status);
+
+ // There was an error receiving a message from the client and we've already printed the error
+ // so call runNextInGuard() to clean up the session without waiting.
+ _runNextInGuard(std::move(guard));
}
void ServiceStateMachine::_sinkCallback(Status status) {
@@ -412,10 +415,10 @@ void ServiceStateMachine::_sinkCallback(Status status) {
dassert(state() == State::SinkWait);
// If there was an error sinking the message to the client, then we should print an error and
- // end the session.
+ // end the session. No need to unwind the stack, so this will runNextInGuard() and return.
//
- // Otherwise, update the current state depending on whether we're in exhaust or not and return
- // from this function to let _runOnce continue the future chaining of state transitions.
+ // Otherwise, update the current state depending on whether we're in exhaust or not, and call
+ // scheduleNext() to unwind the stack and do the next step.
if (!status.isOK()) {
LOGV2(22989,
"Error sending response to client. Ending connection from remote",
@@ -423,19 +426,25 @@ void ServiceStateMachine::_sinkCallback(Status status) {
"remote"_attr = _session()->remote(),
"connectionId"_attr = _session()->id());
_state.store(State::EndSession);
- uassertStatusOK(status);
+ return _runNextInGuard(std::move(guard));
} else if (_inExhaust) {
_state.store(State::Process);
+ return _scheduleNextWithGuard(std::move(guard),
+ ServiceExecutor::kDeferredTask |
+ ServiceExecutor::kMayYieldBeforeSchedule);
} else {
_state.store(State::Source);
+ return _scheduleNextWithGuard(std::move(guard),
+ ServiceExecutor::kDeferredTask |
+ ServiceExecutor::kMayYieldBeforeSchedule);
}
}
-Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
+void ServiceStateMachine::_processMessage(ThreadGuard guard) {
invariant(!_inMessage.empty());
TrafficRecorder::get(_serviceContext)
- .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage);
+ .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage);
auto& compressorMgr = MessageCompressorManager::forSession(_session());
@@ -458,7 +467,7 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
// The handleRequest is implemented in a subclass for mongod/mongos and actually all the
// database work for this request.
- return _sep->handleRequest(opCtx.get(), _inMessage)
+ _sep->handleRequest(opCtx.get(), _inMessage)
.then([this,
&compressorMgr = compressorMgr,
opCtx = std::move(opCtx),
@@ -507,77 +516,106 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
}
TrafficRecorder::get(_serviceContext)
- .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink);
+ .observe(
+ _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
+
+ _sinkMessage(std::move(guard), std::move(toSink));
- _outMessage = std::move(toSink);
} else {
_state.store(State::Source);
_inMessage.reset();
- _outMessage.reset();
_inExhaust = false;
+ return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kDeferredTask);
}
- });
+ })
+ .get();
}
-void ServiceStateMachine::start() {
- _executor()->schedule(
- GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
- // TODO(SERVER-49109) We can't use static ownership in general with
- // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter.
- ThreadGuard guard(shared_from_this().get());
- guard.markStaticOwnership();
+void ServiceStateMachine::runNext() {
+ return _runNextInGuard(ThreadGuard(this));
+}
- // If this is the first run of the SSM, then update its state to Source
- if (state() == State::Created) {
- _state.store(State::Source);
- }
+void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {
+ auto curState = state();
+ dassert(curState != State::Ended);
- _runOnce();
- }));
-}
+ // If this is the first run of the SSM, then update its state to Source
+ if (curState == State::Created) {
+ curState = State::Source;
+ _state.store(curState);
+ }
-void ServiceStateMachine::_runOnce() {
- makeReadyFutureWith([&]() -> Future<void> {
- if (_inExhaust) {
- return Status::OK();
- } else {
- return _sourceMessage(ThreadGuard(this));
+ // Destroy the opCtx (already killed) here, to potentially use the delay between clients'
+ // requests to hide the destruction cost.
+ if (MONGO_likely(_killedOpCtx)) {
+ _killedOpCtx.reset();
+ }
+
+ // Make sure the current Client got set correctly
+ dassert(Client::getCurrent() == _dbClientPtr);
+ try {
+ switch (curState) {
+ case State::Source:
+ _sourceMessage(std::move(guard));
+ break;
+ case State::Process:
+ _processMessage(std::move(guard));
+ break;
+ case State::EndSession:
+ _cleanupSession(std::move(guard));
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
- })
- .then([this]() { return _processMessage(ThreadGuard(this)); })
- .then([this]() -> Future<void> {
- if (_outMessage.empty()) {
- return Status::OK();
- }
- return _sinkMessage(ThreadGuard(this));
- })
- .getAsync([this, anchor = shared_from_this()](Status status) {
- // Destroy the opCtx (already killed) here, to potentially use the delay between
- // clients' requests to hide the destruction cost.
- if (MONGO_likely(_killedOpCtx)) {
- _killedOpCtx.reset();
- }
- if (!status.isOK()) {
- _state.store(State::EndSession);
- // The service executor failed to schedule the task. This could for example be that
- // we failed to start a worker thread. Terminate this connection to leave the system
- // in a valid state.
- LOGV2_WARNING_OPTIONS(4910400,
- {logv2::LogComponent::kExecutor},
- "Terminating session due to error: {error}",
- "Terminating session due to error",
- "error"_attr = status);
- terminate();
-
- ThreadGuard terminateGuard(this);
- _cleanupSession(std::move(terminateGuard));
- return;
- }
+ return;
+ } catch (const DBException& e) {
+ LOGV2(22990,
+ "DBException handling request, closing client connection: {error}",
+ "DBException handling request, closing client connection",
+ "error"_attr = redact(e));
+ }
+ // No need to catch std::exception, as std::terminate will be called when the exception bubbles
+ // to the top of the stack
- _executor()->schedule(GuaranteedExecutor::enforceRunOnce(
- [this, anchor = shared_from_this()](Status status) { _runOnce(); }));
- });
+ if (!guard) {
+ guard = ThreadGuard(this);
+ }
+ _state.store(State::EndSession);
+ _cleanupSession(std::move(guard));
+}
+
+void ServiceStateMachine::start(Ownership ownershipModel) {
+ _scheduleNextWithGuard(
+ ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel);
+}
+
+void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) {
+ _serviceExecutor = executor;
+}
+
+void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard,
+ transport::ServiceExecutor::ScheduleFlags flags,
+ Ownership ownershipModel) {
+ auto func = [ssm = shared_from_this(), ownershipModel] {
+ ThreadGuard guard(ssm.get());
+ if (ownershipModel == Ownership::kStatic)
+ guard.markStaticOwnership();
+ ssm->_runNextInGuard(std::move(guard));
+ };
+ guard.release();
+ Status status = _serviceExecutor->scheduleTask(std::move(func), flags);
+ if (status.isOK()) {
+ return;
+ }
+
+ // We've had an error, reacquire the ThreadGuard and destroy the SSM
+ ThreadGuard terminateGuard(this);
+
+ // The service executor failed to schedule the task. This could for example be that we failed
+ // to start a worker thread. Terminate this connection to leave the system in a valid state.
+ _terminateAndLogIfError(status);
+ _cleanupSession(std::move(terminateGuard));
}
void ServiceStateMachine::terminate() {
@@ -659,12 +697,9 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {
_inMessage.reset();
- _outMessage.reset();
-
// By ignoring the return value of Client::releaseCurrent() we destroy the session.
// _dbClient is now nullptr and _dbClientPtr is invalid and should never be accessed.
Client::releaseCurrent();
}
-} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index 7794cdfbfce..0572c4f6ac8 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -35,7 +35,6 @@
#include "mongo/base/status.h"
#include "mongo/config.h"
-#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
@@ -45,11 +44,9 @@
#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_mode.h"
-#include "mongo/util/future.h"
#include "mongo/util/net/ssl_manager.h"
namespace mongo {
-namespace transport {
/*
* The ServiceStateMachine holds the state of a single client connection and represents the
@@ -66,9 +63,17 @@ public:
ServiceStateMachine& operator=(ServiceStateMachine&&) = delete;
/*
- * Construct a ServiceStateMachine for a given Client.
+ * Creates a new ServiceStateMachine for a given session/service context. If sync is true,
+ * then calls into the transport layer will block while they complete, otherwise they will
+ * be handled asynchronously.
*/
- ServiceStateMachine(ServiceContext::UniqueClient client);
+ static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode);
+
+ ServiceStateMachine(ServiceContext* svcContext,
+ transport::SessionHandle session,
+ transport::Mode transportMode);
/*
* Any state may transition to EndSession in case of an error, otherwise the valid state
@@ -101,9 +106,31 @@ public:
enum class Ownership { kUnowned, kOwned, kStatic };
/*
- * start() schedules a call to _runOnce() in the future.
+ * runNext() will run the current state of the state machine. It also handles all the error
+ * handling and state management for requests.
+ *
+ * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack
+ * if they have just completed a database operation to make sure that this doesn't infinitely
+ * recurse.
+ *
+ * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take
+ * ownership of the SSM, it will call scheduleNext() and return immediately.
+ */
+ void runNext();
+
+ /*
+ * start() schedules a call to runNext() in the future.
+ *
+ * It is guaranteed to unwind the stack, and not call runNext() recursively, but is not
+ * guaranteed that runNext() will run after this return
+ */
+ void start(Ownership ownershipModel);
+
+ /*
+ * Set the executor to be used for the next call to runNext(). This allows switching between
+ * thread models after the SSM has started.
*/
- void start();
+ void setServiceExecutor(transport::ServiceExecutor* executor);
/*
* Gets the current state of connection for testing/diagnostic purposes.
@@ -133,8 +160,7 @@ public:
private:
/*
- * A class that wraps up lifetime management of the _dbClient and _threadName for
- * each step in _runOnce();
+ * A class that wraps up lifetime management of the _dbClient and _threadName for runNext();
*/
class ThreadGuard;
friend class ThreadGuard;
@@ -162,15 +188,18 @@ private:
const transport::SessionHandle& _session() const;
/*
- * Gets the transport::ServiceExecutor associated with this connection.
+ * This is the actual implementation of runNext() that gets called after the ThreadGuard
+ * has been successfully created. If any callbacks (like sourceCallback()) need to call
+ * runNext() and already own a ThreadGuard, they should call this with that guard as the
+ * argument.
*/
- ServiceExecutor* _executor();
+ void _runNextInGuard(ThreadGuard guard);
/*
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
- Future<void> _processMessage(ThreadGuard guard);
+ inline void _processMessage(ThreadGuard guard);
/*
* These get called by the TransportLayer when requested network I/O has completed.
@@ -182,8 +211,8 @@ private:
* Source/Sink message from the TransportLayer. These will invalidate the ThreadGuard just
* before waiting on the TL.
*/
- Future<void> _sourceMessage(ThreadGuard guard);
- Future<void> _sinkMessage(ThreadGuard guard);
+ void _sourceMessage(ThreadGuard guard);
+ void _sinkMessage(ThreadGuard guard, Message toSink);
/*
* Releases all the resources associated with the session and call the cleanupHook.
@@ -191,30 +220,27 @@ private:
void _cleanupSession(ThreadGuard guard);
/*
- * This is the initial function called at the beginning of a thread's lifecycle in the
- * TransportLayer.
- */
- void _runOnce();
-
- /*
* Releases all the resources associated with the exhaust request.
*/
void _cleanupExhaustResources() noexcept;
AtomicWord<State> _state{State::Created};
+ ServiceEntryPoint* _sep;
+ transport::Mode _transportMode;
+
ServiceContext* const _serviceContext;
- ServiceEntryPoint* const _sep;
+ transport::ServiceExecutor* _serviceExecutor;
transport::SessionHandle _sessionHandle;
+ const std::string _threadName;
ServiceContext::UniqueClient _dbClient;
- Client* _dbClientPtr;
+ const Client* _dbClientPtr;
std::function<void()> _cleanupHook;
bool _inExhaust = false;
boost::optional<MessageCompressorId> _compressorId;
Message _inMessage;
- Message _outMessage;
// Allows delegating destruction of opCtx to another function to potentially remove its cost
// from the critical path. This is currently only used in `_processMessage()`.
@@ -224,6 +250,7 @@ private:
#if MONGO_CONFIG_DEBUG_BUILD
AtomicWord<stdx::thread::id> _owningThread;
#endif
+ std::string _oldThreadName;
};
template <typename T>
@@ -254,5 +281,4 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) {
return stream;
}
-} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp
index 8b89b64516e..e0ff202bf6c 100644
--- a/src/mongo/transport/transport_layer_manager.cpp
+++ b/src/mongo/transport/transport_layer_manager.cpp
@@ -136,6 +136,8 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig(
transport::TransportLayerASIO::Options opts(config);
opts.transportMode = transport::Mode::kSynchronous;
+ ctx->setServiceExecutor(std::make_unique<ServiceExecutorSynchronous>(ctx));
+
std::vector<std::unique_ptr<TransportLayer>> retVector;
retVector.emplace_back(std::make_unique<transport::TransportLayerASIO>(opts, sep));
return std::make_unique<TransportLayerManager>(std::move(retVector));