diff options
author | Reo Kimura <reo.kimura@mongodb.com> | 2020-07-22 03:03:43 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-14 18:57:06 +0000 |
commit | 97e16187ff3065d242a61a52e7b6edd4d439fb30 (patch) | |
tree | a7c528e43c3f3fef4116fadafb3384cccff8c04c /src/mongo/transport/service_entry_point_impl.cpp | |
parent | a629d56622ffb64025150fc674212267d2b5cf84 (diff) | |
download | mongo-97e16187ff3065d242a61a52e7b6edd4d439fb30.tar.gz |
SERVER-49072 Make ServiceExecutors into Decorations
Diffstat (limited to 'src/mongo/transport/service_entry_point_impl.cpp')
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 91 |
1 files changed, 70 insertions, 21 deletions
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 1824ccf1c94..ec35471ede6 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -48,6 +48,10 @@ #include <sys/resource.h> #endif +#if !defined(__has_feature) +#define __has_feature(x) 0 +#endif + namespace mongo { bool shouldOverrideMaxConns(const transport::SessionHandle& session, @@ -114,20 +118,29 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s } _maxNumConnections = supportedMax; - - if (serverGlobalParams.reservedAdminThreads) { - _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>( - _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); - } } Status ServiceEntryPointImpl::start() { - if (_adminInternalPool) - return _adminInternalPool->start(); - else - return Status::OK(); + if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start(); + !status.isOK()) { + return status; + } + + if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (auto status = exec->start(); !status.isOK()) { + return status; + } + } + + // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 + // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) { + // return status; + // } + + return Status::OK(); } +// TODO: explicitly start on the fixed executor void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs const auto& remoteAddr = session->remoteAddr(); @@ -140,7 +153,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { const bool quiet = serverGlobalParams.quiet.load(); size_t connectionCount; - auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); + auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode(); auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); auto usingMaxConnOverride = false; @@ -168,8 +181,9 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { "connectionCount"_attr = connectionCount); } return; - } else if (usingMaxConnOverride && _adminInternalPool) { - ssm->setServiceExecutor(_adminInternalPool.get()); + } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx); + usingMaxConnOverride && exec) { + ssm->setServiceExecutor(exec); } if (!quiet) { @@ -219,8 +233,13 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { } bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { +#if __has_feature(address_sanitizer) + // When running under address sanitizer, we get false positive leaks due to disorder around + // the lifecycle of a connection and request. When we are running under ASAN, we try a lot + // harder to dry up the server from active connections before going on to really shut down. using logv2::LogComponent; + auto start = _svcCtx->getPreciseClockSource()->now(); stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); // Request that all sessions end, while holding the _sesionsMutex, loop over all the current @@ -257,7 +276,37 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { "shutdown: exhausted grace period active workers to drain; continuing with shutdown...", "workers"_attr = numOpenSessions()); } + + lk.unlock(); + + // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109 + // timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + // timeout = std::max(Milliseconds{0}, timeout - timeSpent); + // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); + // !status.isOK()) { + // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); + // } + + timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + timeout = std::max(Milliseconds{0}, timeout - timeSpent); + if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { + if (auto status = exec->shutdown(timeout); !status.isOK()) { + LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); + } + } + + timeSpent = _svcCtx->getPreciseClockSource()->now() - start; + timeout = std::max(Milliseconds{0}, timeout - timeSpent); + if (auto status = + transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent); + !status.isOK()) { + LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); + } + return result; +#else + return true; +#endif } void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { @@ -267,17 +316,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { bob->append("current", static_cast<int>(sessionCount)); bob->append("available", static_cast<int>(_maxNumConnections - sessionCount)); bob->append("totalCreated", static_cast<int>(_createdConnections.load())); - if (auto sc = getGlobalServiceContext()) { - bob->append("active", static_cast<int>(sc->getActiveClientOperations())); - bob->append("exhaustIsMaster", - static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster())); - bob->append("awaitingTopologyChanges", - static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges())); - } - if (_adminInternalPool) { + invariant(_svcCtx); + bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations())); + bob->append("exhaustIsMaster", + static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster())); + bob->append("awaitingTopologyChanges", + static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges())); + + if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) { BSONObjBuilder section(bob->subobjStart("adminConnections")); - _adminInternalPool->appendStats(§ion); + adminExec->appendStats(§ion); } } |