summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_entry_point_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/transport/service_entry_point_impl.cpp')
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp163
1 files changed, 52 insertions, 111 deletions
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 0090f30a686..1824ccf1c94 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -33,7 +33,6 @@
#include "mongo/transport/service_entry_point_impl.h"
-#include <fmt/format.h>
#include <vector>
#include "mongo/db/auth/restriction_environment.h"
@@ -49,23 +48,12 @@
#include <sys/resource.h>
#endif
-#if !defined(__has_feature)
-#define __has_feature(x) 0
-#endif
-
namespace mongo {
-using namespace fmt::literals;
-
bool shouldOverrideMaxConns(const transport::SessionHandle& session,
const std::vector<stdx::variant<CIDR, std::string>>& exemptions) {
- if (exemptions.empty()) {
- return false;
- }
-
const auto& remoteAddr = session->remoteAddr();
const auto& localAddr = session->localAddr();
-
boost::optional<CIDR> remoteCIDR;
if (remoteAddr.isValid() && remoteAddr.isIP()) {
@@ -93,7 +81,8 @@ bool shouldOverrideMaxConns(const transport::SessionHandle& session,
return false;
}
-size_t getSupportedMax() {
+ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {
+
const auto supportedMax = [] {
#ifdef _WIN32
return serverGlobalParams.maxConns;
@@ -124,33 +113,21 @@ size_t getSupportedMax() {
"limit"_attr = supportedMax);
}
- return supportedMax;
-}
-
-ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx)
- : _svcCtx(svcCtx), _maxNumConnections(getSupportedMax()) {}
+ _maxNumConnections = supportedMax;
-Status ServiceEntryPointImpl::start() {
- if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
- !status.isOK()) {
- return status;
+ if (serverGlobalParams.reservedAdminThreads) {
+ _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
+ _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
}
+}
- if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
- if (auto status = exec->start(); !status.isOK()) {
- return status;
- }
- }
-
- // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
- // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
- // return status;
- // }
-
- return Status::OK();
+Status ServiceEntryPointImpl::start() {
+ if (_adminInternalPool)
+ return _adminInternalPool->start();
+ else
+ return Status::OK();
}
-// TODO: explicitly start on the fixed executor
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
const auto& remoteAddr = session->remoteAddr();
@@ -159,48 +136,43 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto restrictionEnvironment = std::make_unique<RestrictionEnvironment>(remoteAddr, localAddr);
RestrictionEnvironment::set(session, std::move(restrictionEnvironment));
- bool canOverrideMaxConns = shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
-
- auto clientName = "conn{}"_format(session->id());
- auto client = _svcCtx->makeClient(clientName, session);
-
- {
- stdx::lock_guard lk(*client);
- auto seCtx =
- transport::ServiceExecutorContext{}
- .setThreadingModel(transport::ServiceExecutorContext::ThreadingModel::kDedicated)
- .setCanUseReserved(canOverrideMaxConns);
-
- transport::ServiceExecutorContext::set(client.get(), std::move(seCtx));
- }
-
- auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client));
+ SSMListIterator ssmIt;
const bool quiet = serverGlobalParams.quiet.load();
-
size_t connectionCount;
- auto ssmIt = [&]() -> boost::optional<SSMListIterator> {
- stdx::lock_guard lk(_sessionsMutex);
- connectionCount = _currentConnections.load();
- if (connectionCount > _maxNumConnections && !canOverrideMaxConns) {
- return boost::none;
+ auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
+
+ auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
+ auto usingMaxConnOverride = false;
+ {
+ stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
+ connectionCount = _sessions.size() + 1;
+ if (connectionCount > _maxNumConnections) {
+ usingMaxConnOverride =
+ shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
}
- auto it = _sessions.emplace(_sessions.begin(), ssm);
- connectionCount = _sessions.size();
- _currentConnections.store(connectionCount);
- _createdConnections.addAndFetch(1);
- return it;
- }();
+ if (connectionCount <= _maxNumConnections || usingMaxConnOverride) {
+ ssmIt = _sessions.emplace(_sessions.begin(), ssm);
+ _currentConnections.store(connectionCount);
+ _createdConnections.addAndFetch(1);
+ }
+ }
- if (!ssmIt) {
+ // Checking if we successfully added a connection above. Separated from the lock so we don't log
+ // while holding it.
+ if (connectionCount > _maxNumConnections && !usingMaxConnOverride) {
if (!quiet) {
LOGV2(22942,
"Connection refused because there are too many open connections",
"connectionCount"_attr = connectionCount);
}
return;
- } else if (!quiet) {
+ } else if (usingMaxConnOverride && _adminInternalPool) {
+ ssm->setServiceExecutor(_adminInternalPool.get());
+ }
+
+ if (!quiet) {
LOGV2(22943,
"Connection accepted",
"remote"_attr = session->remote(),
@@ -213,7 +185,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto remote = session->remote();
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
- _sessions.erase(*ssmIt);
+ _sessions.erase(ssmIt);
connectionCount = _sessions.size();
_currentConnections.store(connectionCount);
}
@@ -228,7 +200,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
}
});
- ssm->start();
+ auto ownership = ServiceStateMachine::Ownership::kOwned;
+ if (transportMode == transport::Mode::kSynchronous) {
+ ownership = ServiceStateMachine::Ownership::kStatic;
+ }
+ ssm->start(ownership);
}
void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
@@ -243,13 +219,8 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
}
bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
-#if __has_feature(address_sanitizer)
- // When running under address sanitizer, we get false positive leaks due to disorder around
- // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
- // harder to dry up the server from active connections before going on to really shut down.
using logv2::LogComponent;
- auto start = _svcCtx->getPreciseClockSource()->now();
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
// Request that all sessions end, while holding the _sesionsMutex, loop over all the current
@@ -286,37 +257,7 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
"shutdown: exhausted grace period active workers to drain; continuing with shutdown...",
"workers"_attr = numOpenSessions());
}
-
- lk.unlock();
-
- // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
- // timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- // timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
- // !status.isOK()) {
- // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
- // }
-
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
- if (auto status = exec->shutdown(timeout); !status.isOK()) {
- LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
- }
- }
-
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto status =
- transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
- !status.isOK()) {
- LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
- }
-
return result;
-#else
- return true;
-#endif
}
void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
@@ -326,17 +267,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
bob->append("current", static_cast<int>(sessionCount));
bob->append("available", static_cast<int>(_maxNumConnections - sessionCount));
bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
+ if (auto sc = getGlobalServiceContext()) {
+ bob->append("active", static_cast<int>(sc->getActiveClientOperations()));
+ bob->append("exhaustIsMaster",
+ static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster()));
+ bob->append("awaitingTopologyChanges",
+ static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges()));
+ }
- invariant(_svcCtx);
- bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations()));
- bob->append("exhaustIsMaster",
- static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster()));
- bob->append("awaitingTopologyChanges",
- static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges()));
-
- if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) {
+ if (_adminInternalPool) {
BSONObjBuilder section(bob->subobjStart("adminConnections"));
- adminExec->appendStats(&section);
+ _adminInternalPool->appendStats(&section);
}
}