summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/curop.cpp7
-rw-r--r--src/mongo/db/service_context.h6
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/service_entry_point.h9
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp80
-rw-r--r--src/mongo/transport/service_entry_point_impl.h8
-rw-r--r--src/mongo/transport/service_executor.cpp135
-rw-r--r--src/mongo/transport/service_executor.h77
-rw-r--r--src/mongo/transport/service_state_machine.cpp77
-rw-r--r--src/mongo/transport/service_state_machine.h37
11 files changed, 324 insertions, 114 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 6374268664b..85907307fe7 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -141,6 +141,7 @@ env.Library(
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/timer_stats',
'$BUILD_DIR/mongo/rpc/client_metadata',
+ '$BUILD_DIR/mongo/transport/service_executor',
'$BUILD_DIR/mongo/util/diagnostic_info' if get_option('use-diagnostic-latches') == 'on' else [],
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/net/network',
diff --git a/src/mongo/db/curop.cpp b/src/mongo/db/curop.cpp
index 91aba3c28a7..6905e8ce469 100644
--- a/src/mongo/db/curop.cpp
+++ b/src/mongo/db/curop.cpp
@@ -53,6 +53,7 @@
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/rpc/metadata/impersonated_user_metadata.h"
+#include "mongo/transport/service_executor.h"
#include "mongo/util/hex.h"
#include "mongo/util/log_with_sampling.h"
#include "mongo/util/net/socket_utils.h"
@@ -303,6 +304,12 @@ void CurOp::reportCurrentOpForClient(OperationContext* opCtx,
serializeAuthenticatedUsers("effectiveUsers"_sd);
}
+ if (const auto seCtx = transport::ServiceExecutorContext::get(client)) {
+ bool isDedicated = (seCtx->getThreadingModel() ==
+ transport::ServiceExecutorContext::ThreadingModel::kDedicated);
+ infoBuilder->append("threaded"_sd, isDedicated);
+ }
+
if (clientOpCtx) {
infoBuilder->append("opid", static_cast<int>(clientOpCtx->getOpID()));
diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h
index 8d95c13b161..08e93df9cfb 100644
--- a/src/mongo/db/service_context.h
+++ b/src/mongo/db/service_context.h
@@ -42,7 +42,6 @@
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/unordered_set.h"
-#include "mongo/transport/service_executor.h"
#include "mongo/transport/session.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -634,11 +633,6 @@ private:
std::unique_ptr<ServiceEntryPoint> _serviceEntryPoint;
/**
- * The ServiceExecutor
- */
- std::unique_ptr<transport::ServiceExecutor> _serviceExecutor;
-
- /**
* The storage engine, if any.
*/
std::unique_ptr<StorageEngine> _storageEngine;
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 9ac3bf2c329..7932abc662d 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -86,6 +86,7 @@ env.Library(
tlEnv.Library(
target='service_executor',
source=[
+ 'service_executor.cpp',
'service_executor_fixed.cpp',
'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index aa970f1ef67..fb1ad895d78 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -29,6 +29,8 @@
#pragma once
+#include <limits>
+
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
@@ -81,6 +83,13 @@ public:
virtual size_t numOpenSessions() const = 0;
/**
+ * Returns the maximum number of sessions that can be open.
+ */
+ virtual size_t maxOpenSessions() const {
+ return std::numeric_limits<size_t>::max();
+ }
+
+ /**
* Processes a request and fills out a DbResponse.
*/
virtual Future<DbResponse> handleRequest(OperationContext* opCtx,
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index ec35471ede6..0090f30a686 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -33,6 +33,7 @@
#include "mongo/transport/service_entry_point_impl.h"
+#include <fmt/format.h>
#include <vector>
#include "mongo/db/auth/restriction_environment.h"
@@ -54,10 +55,17 @@
namespace mongo {
+using namespace fmt::literals;
+
bool shouldOverrideMaxConns(const transport::SessionHandle& session,
const std::vector<stdx::variant<CIDR, std::string>>& exemptions) {
+ if (exemptions.empty()) {
+ return false;
+ }
+
const auto& remoteAddr = session->remoteAddr();
const auto& localAddr = session->localAddr();
+
boost::optional<CIDR> remoteCIDR;
if (remoteAddr.isValid() && remoteAddr.isIP()) {
@@ -85,8 +93,7 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session,
return false;
}
-ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {
-
+size_t getSupportedMax() {
const auto supportedMax = [] {
#ifdef _WIN32
return serverGlobalParams.maxConns;
@@ -117,9 +124,12 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
"limit"_attr = supportedMax);
}
- _maxNumConnections = supportedMax;
+ return supportedMax;
}
+ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx)
+ : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {}
+
Status ServiceEntryPointImpl::start() {
if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
!status.isOK()) {
@@ -149,44 +159,48 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr);
RestrictionEnvironment::set(session, std::move(restrictionEnvironment));
- SSMListIterator ssmIt;
+ bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
- const bool quiet = serverGlobalParams.quiet.load();
- size_t connectionCount;
- auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode();
+ auto clientName = "conn{}"_format(session->id());
+ auto client = _svcCtx->makeClient(clientName, session);
- auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
- auto usingMaxConnOverride = false;
{
- stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
- connectionCount = _sessions.size() + 1;
- if (connectionCount > _maxNumConnections) {
- usingMaxConnOverride =
- shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
- }
+ stdx::lock_guard lk(*client);
+ auto seCtx =
+ transport::ServiceExecutorContext{}
+ .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated)
+ .setCanUseReserved(canOverrideMaxConns);
- if (connectionCount <= _maxNumConnections || usingMaxConnOverride) {
- ssmIt = _sessions.emplace(_sessions.begin(), ssm);
- _currentConnections.store(connectionCount);
- _createdConnections.addAndFetch(1);
- }
+ transport::ServiceExecutorContext::set(client.get(), std::move(seCtx));
}
- // Checking if we successfully added a connection above. Separated from the lock so we don't log
- // while holding it.
- if (connectionCount > _maxNumConnections && !usingMaxConnOverride) {
+ auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client));
+
+ const bool quiet = serverGlobalParams.quiet.load();
+
+ size_t connectionCount;
+ auto ssmIt = [&]() -> boost::optional<SSMListIterator> {
+ stdx::lock_guard lk(_sessionsMutex);
+ connectionCount = _currentConnections.load();
+ if (connectionCount > _maxNumConnections && !canOverrideMaxConns) {
+ return boost::none;
+ }
+
+ auto it = _sessions.emplace(_sessions.begin(), ssm);
+ connectionCount = _sessions.size();
+ _currentConnections.store(connectionCount);
+ _createdConnections.addAndFetch(1);
+ return it;
+ }();
+
+ if (!ssmIt) {
if (!quiet) {
LOGV2(22942,
"Connection refused because there are too many open connections",
"connectionCount"_attr = connectionCount);
}
return;
- } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx);
- usingMaxConnOverride && exec) {
- ssm->setServiceExecutor(exec);
- }
-
- if (!quiet) {
+ } else if (!quiet) {
LOGV2(22943,
"Connection accepted",
"remote"_attr = session->remote(),
@@ -199,7 +213,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto remote = session->remote();
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
- _sessions.erase(ssmIt);
+ _sessions.erase(*ssmIt);
connectionCount = _sessions.size();
_currentConnections.store(connectionCount);
}
@@ -214,11 +228,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
}
});
- auto ownership = ServiceStateMachine::Ownership::kOwned;
- if (transportMode == transport::Mode::kSynchronous) {
- ownership = ServiceStateMachine::Ownership::kStatic;
- }
- ssm->start(ownership);
+ ssm->start();
}
void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 0528a19fbdb..1693469fcda 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -77,8 +77,12 @@ public:
return _currentConnections.load();
}
+ size_t maxOpenSessions() const final {
+ return _maxNumConnections;
+ }
+
private:
- using SSMList = std::list<std::shared_ptr<ServiceStateMachine>>;
+ using SSMList = std::list<std::shared_ptr<transport::ServiceStateMachine>>;
using SSMListIterator = SSMList::iterator;
ServiceContext* const _svcCtx;
@@ -89,7 +93,7 @@ private:
stdx::condition_variable _shutdownCondition;
SSMList _sessions;
- size_t _maxNumConnections{DEFAULT_MAX_CONN};
+ const size_t _maxNumConnections{DEFAULT_MAX_CONN};
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};
};
diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp
new file mode 100644
index 00000000000..37c3d03e1b1
--- /dev/null
+++ b/src/mongo/transport/service_executor.cpp
@@ -0,0 +1,135 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_executor.h"
+
+#include <boost/optional.hpp>
+
+#include "mongo/logv2/log.h"
+#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_executor_fixed.h"
+#include "mongo/transport/service_executor_reserved.h"
+#include "mongo/transport/service_executor_synchronous.h"
+
+namespace mongo {
+namespace transport {
+namespace {
+static constexpr auto kDiagnosticLogLevel = 4;
+
+auto getServiceExecutorContext =
+ Client::declareDecoration<boost::optional<ServiceExecutorContext>>();
+} // namespace
+
+StringData toString(ServiceExecutorContext::ThreadingModel threadingModel) {
+ switch (threadingModel) {
+ case ServiceExecutorContext::ThreadingModel::kDedicated:
+ return "Dedicated"_sd;
+ case ServiceExecutorContext::ThreadingModel::kBorrowed:
+ return "Borrowed"_sd;
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+ServiceExecutorContext* ServiceExecutorContext::get(Client* client) noexcept {
+ auto& serviceExecutorContext = getServiceExecutorContext(client);
+
+ if (!serviceExecutorContext) {
+ // Service worker Clients will never have a ServiceExecutorContext.
+ return nullptr;
+ }
+
+ return &serviceExecutorContext.get();
+}
+
+void ServiceExecutorContext::set(Client* client, ServiceExecutorContext seCtx) noexcept {
+ auto& serviceExecutorContext = getServiceExecutorContext(client);
+ invariant(!serviceExecutorContext);
+
+ seCtx._client = client;
+ seCtx._sep = client->getServiceContext()->getServiceEntryPoint();
+
+ LOGV2_DEBUG(4898000,
+ kDiagnosticLogLevel,
+ "Setting initial ServiceExecutor context for client",
+ "client"_attr = client->desc(),
+ "threadingModel"_attr = seCtx._threadingModel,
+ "canUseReserved"_attr = seCtx._canUseReserved);
+ serviceExecutorContext = std::move(seCtx);
+}
+
+ServiceExecutorContext& ServiceExecutorContext::setThreadingModel(
+ ThreadingModel threadingModel) noexcept {
+ _threadingModel = threadingModel;
+ return *this;
+}
+
+ServiceExecutorContext& ServiceExecutorContext::setCanUseReserved(bool canUseReserved) noexcept {
+ _canUseReserved = canUseReserved;
+ return *this;
+}
+
+ServiceExecutor* ServiceExecutorContext::getServiceExecutor() const noexcept {
+ invariant(_client);
+
+ switch (_threadingModel) {
+ case ThreadingModel::kBorrowed:
+ return ServiceExecutorFixed::get(_client->getServiceContext());
+ case ThreadingModel::kDedicated: {
+ // Continue on.
+ } break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+
+ auto shouldUseReserved = [&] {
+ // This is at best a naive solution. There could be a world where numOpenSessions() changes
+ // very quickly. We are not taking locks on the ServiceEntryPoint, so we may chose to
+ // schedule onto the ServiceExecutorReserved when it is no longer necessary. The upside is
+ // that we will automatically shift to the ServiceExecutorSynchronous after the first
+ // command loop.
+ return _sep->numOpenSessions() > _sep->maxOpenSessions();
+ };
+
+ if (_canUseReserved && shouldUseReserved()) {
+ if (auto exec = transport::ServiceExecutorReserved::get(_client->getServiceContext())) {
+ // We are allowed to use the reserved executor, we should use it, and it exists.
+ return exec;
+ }
+ }
+
+ return transport::ServiceExecutorSynchronous::get(_client->getServiceContext());
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index b702198e6b5..38116908272 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -33,20 +33,19 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/client.h"
+#include "mongo/db/service_context.h"
#include "mongo/platform/bitwise_enum_operators.h"
+#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/session.h"
#include "mongo/transport/transport_mode.h"
#include "mongo/util/duration.h"
#include "mongo/util/functional.h"
#include "mongo/util/out_of_line_executor.h"
namespace mongo {
-// This needs to be forward declared here because the service_context.h is a circular dependency.
-class ServiceContext;
-
namespace transport {
-class Session;
-
/*
* This is the interface for all ServiceExecutors.
*/
@@ -123,6 +122,74 @@ public:
virtual void appendStats(BSONObjBuilder* bob) const = 0;
};
+/**
+ * ServiceExecutorContext determines which ServiceExecutor is used for each Client.
+ */
+class ServiceExecutorContext {
+public:
+ enum ThreadingModel {
+ kBorrowed,
+ kDedicated,
+ };
+
+ /**
+ * Get a pointer to the ServiceExecutorContext for a given client.
+ *
+ * This function is valid to invoke either on the Client thread or with the Client lock.
+ */
+ static ServiceExecutorContext* get(Client* client) noexcept;
+
+ /**
+ * Set the ServiceExecutorContext for a given client.
+ *
+ * This function may only be invoked once and only while under the Client lock.
+ */
+ static void set(Client* client, ServiceExecutorContext seCtx) noexcept;
+
+ ServiceExecutorContext() = default;
+
+ /**
+ * Set the ThreadingModel for the associated Client's service execution.
+ *
+ * This function is only valid to invoke with the Client lock or before the Client is set.
+ */
+ ServiceExecutorContext& setThreadingModel(ThreadingModel threadingModel) noexcept;
+
+ /**
+ * Set if reserved resources are available for the associated Client's service execution.
+ *
+ * This function is only valid to invoke with the Client lock or before the Client is set.
+ */
+ ServiceExecutorContext& setCanUseReserved(bool canUseReserved) noexcept;
+
+ /**
+ * Get the ThreadingModel for the associated Client.
+ *
+ * This function is valid to invoke either on the Client thread or with the Client lock.
+ */
+ auto getThreadingModel() const noexcept {
+ return _threadingModel;
+ }
+
+ /**
+ * Get an appropriate ServiceExecutor given the current parameters.
+ *
+ * This function is only valid to invoke from the associated Client thread. This function does
+ * not require the Client lock since all writes must also happen from that thread.
+ */
+ ServiceExecutor* getServiceExecutor() const noexcept;
+
+private:
+ friend StringData toString(ThreadingModel threadingModel);
+
+ Client* _client = nullptr;
+ ServiceEntryPoint* _sep = nullptr;
+
+ ThreadingModel _threadingModel = ThreadingModel::kDedicated;
+ bool _canUseReserved = false;
+};
+
+
} // namespace transport
ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags)
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index b71e6c94abd..5124542291b 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -36,7 +36,6 @@
#include <memory>
#include "mongo/config.h"
-#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/traffic_recorder.h"
@@ -45,6 +44,7 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_executor_fixed.h"
#include "mongo/transport/service_executor_synchronous.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
@@ -59,6 +59,7 @@
#include "mongo/util/quick_exit.h"
namespace mongo {
+namespace transport {
namespace {
MONGO_FAIL_POINT_DEFINE(doNotSetMoreToCome);
/**
@@ -160,7 +161,6 @@ Message makeExhaustMessage(Message requestMsg, DbResponse* dbresponse) {
return exhaustMessage;
}
-
} // namespace
using transport::ServiceExecutor;
@@ -194,9 +194,10 @@ public:
// Set up the thread name
auto oldThreadName = getThreadName();
- if (oldThreadName != _ssm->_threadName) {
- _ssm->_oldThreadName = getThreadName().toString();
- setThreadName(_ssm->_threadName);
+ const auto& threadName = _ssm->_dbClient->desc();
+ if (oldThreadName != threadName) {
+ _oldThreadName = oldThreadName.toString();
+ setThreadName(threadName);
}
// Swap the current Client so calls to cc() work as expected
@@ -258,8 +259,8 @@ public:
_ssm->_dbClient = Client::releaseCurrent();
}
- if (!_ssm->_oldThreadName.empty()) {
- setThreadName(_ssm->_oldThreadName);
+ if (!_oldThreadName.empty()) {
+ setThreadName(_oldThreadName);
}
}
@@ -287,29 +288,22 @@ public:
private:
ServiceStateMachine* _ssm;
bool _haveTakenOwnership = false;
+ std::string _oldThreadName;
};
-std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext,
- transport::SessionHandle session,
- transport::Mode transportMode) {
- return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), transportMode);
-}
-
-ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
- transport::SessionHandle session,
- transport::Mode transportMode)
+ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)
: _state{State::Created},
- _sep{svcContext->getServiceEntryPoint()},
- _transportMode(transportMode),
- _serviceContext(svcContext),
- _sessionHandle(session),
- _threadName{str::stream() << "conn" << _session()->id()},
- _dbClient{svcContext->makeClient(_threadName, std::move(session))},
- _dbClientPtr{_dbClient.get()},
- _serviceExecutor(transport::ServiceExecutorSynchronous::get(_serviceContext)) {}
+ _serviceContext{client->getServiceContext()},
+ _sep{_serviceContext->getServiceEntryPoint()},
+ _dbClient{std::move(client)},
+ _dbClientPtr{_dbClient.get()} {}
const transport::SessionHandle& ServiceStateMachine::_session() const {
- return _sessionHandle;
+ return _dbClientPtr->session();
+}
+
+ServiceExecutor* ServiceStateMachine::_executor() {
+ return ServiceExecutorContext::get(_dbClientPtr)->getServiceExecutor();
}
Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
@@ -319,11 +313,12 @@ Future<void> ServiceStateMachine::_sourceMessage(ThreadGuard guard) {
guard.release();
auto sourceMsgImpl = [&] {
- if (_transportMode == transport::Mode::kSynchronous) {
+ const auto& transportMode = _executor()->transportMode();
+ if (transportMode == transport::Mode::kSynchronous) {
MONGO_IDLE_THREAD_BLOCK;
return Future<Message>::makeReady(_session()->sourceMessage());
} else {
- invariant(_transportMode == transport::Mode::kAsynchronous);
+ invariant(transportMode == transport::Mode::kAsynchronous);
return _session()->asyncSourceMessage();
}
};
@@ -346,13 +341,14 @@ Future<void> ServiceStateMachine::_sinkMessage(ThreadGuard guard) {
auto toSink = std::exchange(_outMessage, {});
auto sinkMsgImpl = [&] {
- if (_transportMode == transport::Mode::kSynchronous) {
+ const auto& transportMode = _executor()->transportMode();
+ if (transportMode == transport::Mode::kSynchronous) {
// We don't consider ourselves idle while sending the reply since we are still doing
// work on behalf of the client. Contrast that with sourceMessage() where we are waiting
// for the client to send us more work to do.
return Future<void>::makeReady(_session()->sinkMessage(std::move(toSink)));
} else {
- invariant(_transportMode == transport::Mode::kAsynchronous);
+ invariant(transportMode == transport::Mode::kAsynchronous);
return _session()->asyncSinkMessage(std::move(toSink));
}
};
@@ -439,7 +435,7 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
invariant(!_inMessage.empty());
TrafficRecorder::get(_serviceContext)
- .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage);
+ .observe(_session(), _serviceContext->getPreciseClockSource()->now(), _inMessage);
auto& compressorMgr = MessageCompressorManager::forSession(_session());
@@ -511,8 +507,7 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
}
TrafficRecorder::get(_serviceContext)
- .observe(
- _sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);
+ .observe(_session(), _serviceContext->getPreciseClockSource()->now(), toSink);
_outMessage = std::move(toSink);
} else {
@@ -524,16 +519,13 @@ Future<void> ServiceStateMachine::_processMessage(ThreadGuard guard) {
});
}
-void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) {
- _serviceExecutor = executor;
-}
-
-void ServiceStateMachine::start(Ownership ownershipModel) {
- _serviceExecutor->schedule(GuaranteedExecutor::enforceRunOnce(
- [this, anchor = shared_from_this(), ownershipModel](Status status) {
+void ServiceStateMachine::start() {
+ _executor()->schedule(
+ GuaranteedExecutor::enforceRunOnce([this, anchor = shared_from_this()](Status status) {
+ // TODO(SERVER-49109) We can't use static ownership in general with
+ // a ServiceExecutorFixed and async commands. ThreadGuard needs to become smarter.
ThreadGuard guard(shared_from_this().get());
- if (ownershipModel == Ownership::kStatic)
- guard.markStaticOwnership();
+ guard.markStaticOwnership();
// If this is the first run of the SSM, then update its state to Source
if (state() == State::Created) {
@@ -583,7 +575,7 @@ void ServiceStateMachine::_runOnce() {
return;
}
- _serviceExecutor->schedule(GuaranteedExecutor::enforceRunOnce(
+ _executor()->schedule(GuaranteedExecutor::enforceRunOnce(
[this, anchor = shared_from_this()](Status status) { _runOnce(); }));
});
}
@@ -674,4 +666,5 @@ void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {
Client::releaseCurrent();
}
+} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index b04a419a362..7794cdfbfce 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/config.h"
+#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
@@ -48,6 +49,7 @@
#include "mongo/util/net/ssl_manager.h"
namespace mongo {
+namespace transport {
/*
* The ServiceStateMachine holds the state of a single client connection and represents the
@@ -64,17 +66,9 @@ public:
ServiceStateMachine& operator=(ServiceStateMachine&&) = delete;
/*
- * Creates a new ServiceStateMachine for a given session/service context. If sync is true,
- * then calls into the transport layer will block while they complete, otherwise they will
- * be handled asynchronously.
+ * Construct a ServiceStateMachine for a given Client.
*/
- static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext,
- transport::SessionHandle session,
- transport::Mode transportMode);
-
- ServiceStateMachine(ServiceContext* svcContext,
- transport::SessionHandle session,
- transport::Mode transportMode);
+ ServiceStateMachine(ServiceContext::UniqueClient client);
/*
* Any state may transition to EndSession in case of an error, otherwise the valid state
@@ -109,13 +103,7 @@ public:
/*
* start() schedules a call to _runOnce() in the future.
*/
- void start(Ownership ownershipModel);
-
- /*
- * Set the executor to be used for the next call to runNext(). This allows switching between
- * thread models after the SSM has started.
- */
- void setServiceExecutor(transport::ServiceExecutor* executor);
+ void start();
/*
* Gets the current state of connection for testing/diagnostic purposes.
@@ -174,6 +162,11 @@ private:
const transport::SessionHandle& _session() const;
/*
+ * Gets the transport::ServiceExecutor associated with this connection.
+ */
+ ServiceExecutor* _executor();
+
+ /*
* This function actually calls into the database and processes a request. It's broken out
* into its own inline function for better readability.
*/
@@ -210,16 +203,12 @@ private:
AtomicWord<State> _state{State::Created};
- ServiceEntryPoint* _sep;
- transport::Mode _transportMode;
-
ServiceContext* const _serviceContext;
+ ServiceEntryPoint* const _sep;
transport::SessionHandle _sessionHandle;
- const std::string _threadName;
ServiceContext::UniqueClient _dbClient;
- const Client* _dbClientPtr;
- transport::ServiceExecutor* _serviceExecutor;
+ Client* _dbClientPtr;
std::function<void()> _cleanupHook;
bool _inExhaust = false;
@@ -235,7 +224,6 @@ private:
#if MONGO_CONFIG_DEBUG_BUILD
AtomicWord<stdx::thread::id> _owningThread;
#endif
- std::string _oldThreadName;
};
template <typename T>
@@ -266,4 +254,5 @@ T& operator<<(T& stream, const ServiceStateMachine::State& state) {
return stream;
}
+} // namespace transport
} // namespace mongo