summaryrefslogtreecommitdiff
path: root/src/mongo/transport/service_entry_point_impl.cpp
diff options
context:
space:
mode:
authorReo Kimura <reo.kimura@mongodb.com>2020-07-22 03:03:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-14 18:57:06 +0000
commit97e16187ff3065d242a61a52e7b6edd4d439fb30 (patch)
treea7c528e43c3f3fef4116fadafb3384cccff8c04c /src/mongo/transport/service_entry_point_impl.cpp
parenta629d56622ffb64025150fc674212267d2b5cf84 (diff)
downloadmongo-97e16187ff3065d242a61a52e7b6edd4d439fb30.tar.gz
SERVER-49072 Make ServiceExecutors into Decorations
Diffstat (limited to 'src/mongo/transport/service_entry_point_impl.cpp')
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp91
1 files changed, 70 insertions, 21 deletions
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 1824ccf1c94..ec35471ede6 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -48,6 +48,10 @@
#include <sys/resource.h>
#endif
+#if !defined(__has_feature)
+#define __has_feature(x) 0
+#endif
+
namespace mongo {
bool shouldOverrideMaxConns(const transport::SessionHandle& session,
@@ -114,20 +118,29 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
}
_maxNumConnections = supportedMax;
-
- if (serverGlobalParams.reservedAdminThreads) {
- _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
- _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
- }
}
Status ServiceEntryPointImpl::start() {
- if (_adminInternalPool)
- return _adminInternalPool->start();
- else
- return Status::OK();
+ if (auto status = transport::ServiceExecutorSynchronous::get(_svcCtx)->start();
+ !status.isOK()) {
+ return status;
+ }
+
+ if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
+ if (auto status = exec->start(); !status.isOK()) {
+ return status;
+ }
+ }
+
+ // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
+ // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->start(); !status.isOK()) {
+ // return status;
+ // }
+
+ return Status::OK();
}
+// TODO: explicitly start on the fixed executor
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Setup the restriction environment on the Session, if the Session has local/remote Sockaddrs
const auto& remoteAddr = session->remoteAddr();
@@ -140,7 +153,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
const bool quiet = serverGlobalParams.quiet.load();
size_t connectionCount;
- auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
+ auto transportMode = transport::ServiceExecutorSynchronous::get(_svcCtx)->transportMode();
auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
auto usingMaxConnOverride = false;
@@ -168,8 +181,9 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
"connectionCount"_attr = connectionCount);
}
return;
- } else if (usingMaxConnOverride && _adminInternalPool) {
- ssm->setServiceExecutor(_adminInternalPool.get());
+ } else if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx);
+ usingMaxConnOverride && exec) {
+ ssm->setServiceExecutor(exec);
}
if (!quiet) {
@@ -219,8 +233,13 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
}
bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
+#if __has_feature(address_sanitizer)
+ // When running under address sanitizer, we get false positive leaks due to disorder around
+ // the lifecycle of a connection and request. When we are running under ASAN, we try a lot
+ // harder to dry up the server from active connections before going on to really shut down.
using logv2::LogComponent;
+ auto start = _svcCtx->getPreciseClockSource()->now();
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
// Request that all sessions end, while holding the _sesionsMutex, loop over all the current
@@ -257,7 +276,37 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
"shutdown: exhausted grace period active workers to drain; continuing with shutdown...",
"workers"_attr = numOpenSessions());
}
+
+ lk.unlock();
+
+ // TODO: Reintroduce SEF once it is attached as initial SE in SERVER-49109
+ // timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ // timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ // if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
+ // !status.isOK()) {
+ // LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
+ // }
+
+ timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
+ if (auto status = exec->shutdown(timeout); !status.isOK()) {
+ LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
+ }
+ }
+
+ timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
+ timeout = std::max(Milliseconds{0}, timeout - timeSpent);
+ if (auto status =
+ transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
+ !status.isOK()) {
+ LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
+ }
+
return result;
+#else
+ return true;
+#endif
}
void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
@@ -267,17 +316,17 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
bob->append("current", static_cast<int>(sessionCount));
bob->append("available", static_cast<int>(_maxNumConnections - sessionCount));
bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
- if (auto sc = getGlobalServiceContext()) {
- bob->append("active", static_cast<int>(sc->getActiveClientOperations()));
- bob->append("exhaustIsMaster",
- static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster()));
- bob->append("awaitingTopologyChanges",
- static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges()));
- }
- if (_adminInternalPool) {
+ invariant(_svcCtx);
+ bob->append("active", static_cast<int>(_svcCtx->getActiveClientOperations()));
+ bob->append("exhaustIsMaster",
+ static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumExhaustIsMaster()));
+ bob->append("awaitingTopologyChanges",
+ static_cast<int>(IsMasterMetrics::get(_svcCtx)->getNumAwaitingTopologyChanges()));
+
+ if (auto adminExec = transport::ServiceExecutorReserved::get(_svcCtx)) {
BSONObjBuilder section(bob->subobjStart("adminConnections"));
- _adminInternalPool->appendStats(&section);
+ adminExec->appendStats(&section);
}
}