diff options
Diffstat (limited to 'src/mongo/transport/service_entry_point_impl.cpp')
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 80 |
1 files changed, 45 insertions, 35 deletions
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index ec35471ede6..0090f30a686 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/transport/service_entry_point_impl.h" +#include <fmt/format.h> #include <vector> #include "mongo/db/auth/restriction_environment.h" @@ -54,10 +55,17 @@ namespace mongo { +using namespace fmt::literals; + bool shouldOverrideMaxConns(const transport::SessionHandle& session, const std::vector<stdx::variant<CIDR, std::string>>& exemptions) { + if (exemptions.empty()) { + return false; + } + const auto& remoteAddr = session->remoteAddr(); const auto& localAddr = session->localAddr(); + boost::optional<CIDR> remoteCIDR; if (remoteAddr.isValid() && remoteAddr.isIP()) { @@ -85,8 +93,7 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session, return false; } -ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { - +size_t getSupportedMax() { const auto supportedMax = [] { #ifdef _WIN32 return serverGlobalParams.maxConns; @@ -117,9 +124,12 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s "limit"_attr = supportedMax); } - _maxNumConnections = supportedMax; + return supportedMax; } +ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) + : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {} + Status ServiceEntryPointImpl::start() { if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); !status.isOK()) { @@ -149,44 +159,48 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr); RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); - SSMListIterator ssmIt; + bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - const bool quiet = serverGlobalParams.quiet.load(); - size_t connectionCount; - auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode(); + auto clientName = "conn{}"_format(session->id()); + auto client = _svcCtx->makeClient(clientName, session); - auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); - auto usingMaxConnOverride = false; { - stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - connectionCount = _sessions.size() + 1; - if (connectionCount > _maxNumConnections) { - usingMaxConnOverride = - shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - } + stdx::lock_guard lk(*client); + auto seCtx = + transport::ServiceExecutorContext{} + .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated) + .setCanUseReserved(canOverrideMaxConns); - if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { - ssmIt = _sessions.emplace(_sessions.begin(), ssm); - _currentConnections.store(connectionCount); - _createdConnections.addAndFetch(1); - } + transport::ServiceExecutorContext::set(client.get(), std::move(seCtx)); } - // Checking if we successfully added a connection above. Separated from the lock so we don't log - // while holding it. - if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { + auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client)); + + const bool quiet = serverGlobalParams.quiet.load(); + + size_t connectionCount; + auto ssmIt = [&]() -> boost::optional<SSMListIterator> { + stdx::lock_guard lk(_sessionsMutex); + connectionCount = _currentConnections.load(); + if (connectionCount > _maxNumConnections && !canOverrideMaxConns) { + return boost::none; + } + + auto it = _sessions.emplace(_sessions.begin(), ssm); + connectionCount = _sessions.size(); + _currentConnections.store(connectionCount); + _createdConnections.addAndFetch(1); + return it; + }(); + + if (!ssmIt) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", "connectionCount"_attr = connectionCount); } return; - } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx); - usingMaxConnOverride && exec) { - ssm->setServiceExecutor(exec); - } - - if (!quiet) { + } else if (!quiet) { LOGV2(22943, "Connection accepted", "remote"_attr = session->remote(), @@ -199,7 +213,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto remote = session->remote(); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(ssmIt); + _sessions.erase(*ssmIt); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); } @@ -214,11 +228,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { } }); - auto ownership = ServiceStateMachine::Ownership::kOwned; - if (transportMode == transport::Mode::kSynchronous) { - ownership = ServiceStateMachine::Ownership::kStatic; - } - ssm->start(ownership); + ssm->start(); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { |