diff options
Diffstat (limited to 'src/mongo/transport/service_entry_point_impl.cpp')
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 163 |
1 files changed, 52 insertions, 111 deletions
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 0090f30a686..1824ccf1c94 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -33,7 +33,6 @@ #include "mongo/transport/service_entry_point_impl.h" -#include <fmt/format.h> #include <vector> #include "mongo/db/auth/restriction_environment.h" @@ -49,23 +48,12 @@ #include <sys/resource.h> #endif -#if !defined(__has_feature) -#define __has_feature(x) 0 -#endif - namespace mongo { -using namespace fmt::literals; - bool shouldOverrideMaxConns(const transport::SessionHandle& session, const std::vector<stdx::variant<CIDR, std::string>>& exemptions) { - if (exemptions.empty()) { - return false; - } - const auto& remoteAddr = session->remoteAddr(); const auto& localAddr = session->localAddr(); - boost::optional<CIDR> remoteCIDR; if (remoteAddr.isValid() && remoteAddr.isIP()) { @@ -93,7 +81,8 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session, return false; } -size_t getSupportedMax() { +ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { + const auto supportedMax = [] { #ifdef _WIN32 return serverGlobalParams.maxConns; @@ -124,33 +113,21 @@ size_t getSupportedMax() { "limit"_attr = supportedMax); } - return supportedMax; -} - -ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) - : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {} + _maxNumConnections = supportedMax; -Status ServiceEntryPointImpl::start() { - if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); - !status.isOK()) { - return status; + if (serverGlobalParams.reservedAdminThreads) { + _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>( + _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); } +} - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->start(); !status.isOK()) { - return status; - } - } - - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { - // return status; - // } - - return Status::OK(); +Status ServiceEntryPointImpl::start() { + if (_adminInternalPool) + return _adminInternalPool->start(); + else + return Status::OK(); } -// TODO: explicitly start on the fixed executor void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs const auto& remoteAddr = session->remoteAddr(); @@ -159,48 +136,43 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr); RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); - bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - - auto clientName = "conn{}"_format(session->id()); - auto client = _svcCtx->makeClient(clientName, session); - - { - stdx::lock_guard lk(*client); - auto seCtx = - transport::ServiceExecutorContext{} - .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated) - .setCanUseReserved(canOverrideMaxConns); - - transport::ServiceExecutorContext::set(client.get(), std::move(seCtx)); - } - - auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client)); + SSMListIterator ssmIt; const bool quiet = serverGlobalParams.quiet.load(); - size_t connectionCount; - auto ssmIt = [&]() -> boost::optional<SSMListIterator> { - stdx::lock_guard lk(_sessionsMutex); - connectionCount = _currentConnections.load(); - if (connectionCount > _maxNumConnections && !canOverrideMaxConns) { - return boost::none; + auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); + + auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); + auto usingMaxConnOverride = false; + { + stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); + connectionCount = _sessions.size() + 1; + if (connectionCount > _maxNumConnections) { + usingMaxConnOverride = + shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); } - auto it = _sessions.emplace(_sessions.begin(), ssm); - connectionCount = _sessions.size(); - _currentConnections.store(connectionCount); - _createdConnections.addAndFetch(1); - return it; - }(); + if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { + ssmIt = _sessions.emplace(_sessions.begin(), ssm); + _currentConnections.store(connectionCount); + _createdConnections.addAndFetch(1); + } + } - if (!ssmIt) { + // Checking if we successfully added a connection above. Separated from the lock so we don't log + // while holding it. + if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", "connectionCount"_attr = connectionCount); } return; - } else if (!quiet) { + } else if (usingMaxConnOverride && _adminInternalPool) { + ssm->setServiceExecutor(_adminInternalPool.get()); + } + + if (!quiet) { LOGV2(22943, "Connection accepted", "remote"_attr = session->remote(), @@ -213,7 +185,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto remote = session->remote(); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(*ssmIt); + _sessions.erase(ssmIt); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); } @@ -228,7 +200,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { } }); - ssm->start(); + auto ownership = ServiceStateMachine::Ownership::kOwned; + if (transportMode == transport::Mode::kSynchronous) { + ownership = ServiceStateMachine::Ownership::kStatic; + } + ssm->start(ownership); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { @@ -243,13 +219,8 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { } bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { -#if __has_feature(address_sanitizer) - // When running under address sanitizer, we get false positive leaks due to disorder around - // the lifecycle of a connection and request. When we are running under ASAN, we try a lot - // harder to dry up the server from active connections before going on to really shut down. using logv2::LogComponent; - auto start = _svcCtx->getPreciseClockSource()->now(); stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); // Request that all sessions end, while holding the _sesionsMutex, loop over all the current @@ -286,37 +257,7 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { "shutdown: exhausted grace period active workers to drain; continuing with shutdown...", "workers"_attr = numOpenSessions()); } - - lk.unlock(); - - // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 - // timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - // timeout = std::max(Milliseconds{0}, timeout - timeSpent); - // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); - // !status.isOK()) { - // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); - // } - - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->shutdown(timeout); !status.isOK()) { - LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); - } - } - - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto status = - transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent); - !status.isOK()) { - LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); - } - return result; -#else - return true; -#endif } void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { @@ -326,17 +267,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { bob->append("current", static_cast<int>(sessionCount)); bob->append("available", static_cast<int>(_maxNumConnections - sessionCount)); bob->append("totalCreated", static_cast<int>(_createdConnections.load())); + if (auto sc = getGlobalServiceContext()) { + bob->append("active", static_cast<int>(sc->getActiveClientOperations())); + bob->append("exhaustIsMaster", + static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster())); + bob->append("awaitingTopologyChanges", + static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges())); + } - invariant(_svcCtx); - bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations())); - bob->append("exhaustIsMaster", - static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster())); - bob->append("awaitingTopologyChanges", - static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges())); - - if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (_adminInternalPool) { BSONObjBuilder section(bob->subobjStart("adminConnections")); - adminExec->appendStats(§ion); + _adminInternalPool->appendStats(§ion); } } |