summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2020-12-15 22:40:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-06 20:58:51 +0000
commit6020123ae6d42a31a9564ab96ab4979d1acd4f5a (patch)
treed679f44e4fbfd88edba5e669a9cbc48e3f8c2933 /src/mongo/transport
parent0615cd112f6cbe12ad6aab52319903a954158da5 (diff)
downloadmongo-6020123ae6d42a31a9564ab96ab4979d1acd4f5a.tar.gz
SERVER-53421 Provide ways to track Session cleanup
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/service_entry_point.h7
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp107
-rw-r--r--src/mongo/transport/service_entry_point_impl.h10
-rw-r--r--src/mongo/transport/service_executor.cpp34
-rw-r--r--src/mongo/transport/service_executor.h5
-rw-r--r--src/mongo/transport/service_executor.idl2
-rw-r--r--src/mongo/transport/service_executor_fixed.cpp1
-rw-r--r--src/mongo/transport/service_state_machine.cpp13
8 files changed, 109 insertions, 70 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index fb1ad895d78..3e8d279b7b9 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -95,6 +95,13 @@ public:
virtual Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& request) noexcept = 0;
+ /**
+ * Optional handler which is invoked after a session ends.
+ *
+ * This function implies that the Session itself will soon be destructed.
+ */
+ virtual void onEndSession(const transport::SessionHandle&) {}
+
protected:
ServiceEntryPoint() = default;
};
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 095940021da..111dfd4bbd3 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -164,26 +164,24 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto clientName = "conn{}"_format(session->id());
auto client = _svcCtx->makeClient(clientName, session);
- auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client));
-
const bool quiet = serverGlobalParams.quiet.load();
size_t connectionCount;
- auto ssmIt = [&]() -> boost::optional<SSMListIterator> {
+ auto maybeSsmIt = [&]() -> boost::optional<SSMListIterator> {
stdx::lock_guard lk(_sessionsMutex);
connectionCount = _currentConnections.load();
if (connectionCount > _maxNumConnections && !canOverrideMaxConns) {
return boost::none;
}
- auto it = _sessions.emplace(_sessions.begin(), ssm);
+ auto it = _sessions.emplace(_sessions.begin(), std::move(client));
connectionCount = _sessions.size();
_currentConnections.store(connectionCount);
_createdConnections.addAndFetch(1);
return it;
}();
- if (!ssmIt) {
+ if (!maybeSsmIt) {
if (!quiet) {
LOGV2(22942,
"Connection refused because there are too many open connections",
@@ -199,16 +197,16 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
"connectionCount"_attr = connectionCount);
}
- ssm->setCleanupHook([this, ssmIt, quiet, session = std::move(session)] {
+ auto ssmIt = *maybeSsmIt;
+ ssmIt->setCleanupHook([this, ssmIt, quiet, session = std::move(session)] {
size_t connectionCount;
auto remote = session->remote();
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
- _sessions.erase(*ssmIt);
+ _sessions.erase(ssmIt);
connectionCount = _sessions.size();
_currentConnections.store(connectionCount);
}
- _shutdownCondition.notify_one();
if (!quiet) {
LOGV2(22944,
@@ -217,12 +215,14 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
"connectionId"_attr = session->id(),
"connectionCount"_attr = connectionCount);
}
+
+ _sessionsCV.notify_one();
});
auto seCtx = transport::ServiceExecutorContext{};
seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel());
seCtx.setCanUseReserved(canOverrideMaxConns);
- ssm->start(std::move(seCtx));
+ ssmIt->start(std::move(seCtx));
}
void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
@@ -231,7 +231,7 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) {
{
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
for (auto& ssm : _sessions) {
- ssm->terminateIfTagsDontMatch(tags);
+ ssm.terminateIfTagsDontMatch(tags);
}
}
}
@@ -241,36 +241,26 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
// When running under address sanitizer, we get false positive leaks due to disorder around
// the lifecycle of a connection and request. When we are running under ASAN, we try a lot
// harder to dry up the server from active connections before going on to really shut down.
+ return shutdownAndWait(timeout);
+#else
+ return true;
+#endif
+}
- using logv2::LogComponent;
+bool ServiceEntryPointImpl::shutdownAndWait(Milliseconds timeout) {
+ auto deadline = _svcCtx->getPreciseClockSource()->now() + timeout;
- auto start = _svcCtx->getPreciseClockSource()->now();
stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex);
// Request that all sessions end, while holding the _sesionsMutex, loop over all the current
- // connections and terminate them
- for (auto& ssm : _sessions) {
- ssm->terminate();
- }
-
- // Close all sockets and then wait for the number of active connections to reach zero with a
- // condition_variable that notifies in the session cleanup hook. If we haven't closed drained
- // all active operations within the deadline, just keep going with shutdown: the OS will do it
- // for us when the process terminates.
- auto timeSpent = Milliseconds(0);
- const auto checkInterval = std::min(Milliseconds(250), timeout);
-
- auto noWorkersLeft = [this] { return numOpenSessions() == 0; };
- while (timeSpent < timeout &&
- !_shutdownCondition.wait_for(lk, checkInterval.toSystemDuration(), noWorkersLeft)) {
- LOGV2(22945,
- "shutdown: still waiting on {workers} active workers to drain... ",
- "shutdown: still waiting on active workers to drain... ",
- "workers"_attr = numOpenSessions());
- timeSpent += checkInterval;
- }
+ // connections and terminate them. Then wait for the number of active connections to reach zero
+ // with a condition_variable that notifies in the session cleanup hook. If we haven't closed
+ // drained all active operations within the deadline, just keep going with shutdown: the OS will
+ // do it for us when the process terminates.
+ _terminateAll(lk);
+ auto result = _waitForNoSessions(lk, deadline);
+ lk.unlock();
- bool result = noWorkersLeft();
if (result) {
LOGV2(22946, "shutdown: no running workers found...");
} else {
@@ -282,35 +272,36 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
"workers"_attr = numOpenSessions());
}
- lk.unlock();
+ transport::ServiceExecutor::shutdownAll(_svcCtx, deadline);
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout);
- !status.isOK()) {
- LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
- }
+ return result;
+}
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) {
- if (auto status = exec->shutdown(timeout); !status.isOK()) {
- LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
- }
- }
+void ServiceEntryPointImpl::endAllSessionsNoTagMask() {
+ auto lk = stdx::unique_lock<decltype(_sessionsMutex)>(_sessionsMutex);
+ _terminateAll(lk);
+}
- timeSpent = _svcCtx->getPreciseClockSource()->now() - start;
- timeout = std::max(Milliseconds{0}, timeout - timeSpent);
- if (auto status =
- transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent);
- !status.isOK()) {
- LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
+void ServiceEntryPointImpl::_terminateAll(WithLock) {
+ for (auto& ssm : _sessions) {
+ ssm.terminate();
}
+}
- return result;
-#else
- return true;
-#endif
+bool ServiceEntryPointImpl::waitForNoSessions(Milliseconds timeout) {
+ auto deadline = _svcCtx->getPreciseClockSource()->now() + timeout;
+ LOGV2(5342100, "Waiting until for all sessions to conclude", "deadline"_attr = deadline);
+
+ auto lk = stdx::unique_lock<decltype(_sessionsMutex)>(_sessionsMutex);
+ return _waitForNoSessions(lk, deadline);
+}
+
+bool ServiceEntryPointImpl::_waitForNoSessions(stdx::unique_lock<decltype(_sessionsMutex)>& lk,
+ Date_t deadline) {
+ auto noWorkersLeft = [this] { return numOpenSessions() == 0; };
+ _sessionsCV.wait_until(lk, deadline.toSystemTimePoint(), noWorkersLeft);
+
+ return noWorkersLeft();
}
void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 1693469fcda..a8c1b080c7c 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -67,9 +67,12 @@ public:
void startSession(transport::SessionHandle session) override;
void endAllSessions(transport::Session::TagMask tags) final;
+ void endAllSessionsNoTagMask();
Status start() final;
bool shutdown(Milliseconds timeout) final;
+ bool shutdownAndWait(Milliseconds timeout);
+ bool waitForNoSessions(Milliseconds timeout);
void appendStats(BSONObjBuilder* bob) const final;
@@ -82,7 +85,10 @@ public:
}
private:
- using SSMList = std::list<std::shared_ptr<transport::ServiceStateMachine>>;
+ void _terminateAll(WithLock);
+ bool _waitForNoSessions(stdx::unique_lock<Mutex>& lk, Date_t deadline);
+
+ using SSMList = std::list<transport::ServiceStateMachine>;
using SSMListIterator = SSMList::iterator;
ServiceContext* const _svcCtx;
@@ -90,7 +96,7 @@ private:
mutable Mutex _sessionsMutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ServiceEntryPointImpl::_sessionsMutex");
- stdx::condition_variable _shutdownCondition;
+ stdx::condition_variable _sessionsCV;
SSMList _sessions;
const size_t _maxNumConnections{DEFAULT_MAX_CONN};
diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp
index 18efe4c1013..4ec30e50609 100644
--- a/src/mongo/transport/service_executor.cpp
+++ b/src/mongo/transport/service_executor.cpp
@@ -70,11 +70,11 @@ StringData toString(ServiceExecutor::ThreadingModel threadingModel) {
}
}
-Status ServiceExecutor::setInitialThreadingModel(StringData value) noexcept {
+Status ServiceExecutor::setInitialThreadingModelFromString(StringData value) noexcept {
if (value == kThreadingModelDedicatedStr) {
- gInitialThreadingModel = ServiceExecutor::ThreadingModel::kDedicated;
+ setInitialThreadingModel(ServiceExecutor::ThreadingModel::kDedicated);
} else if (value == kThreadingModelBorrowedStr) {
- gInitialThreadingModel = ServiceExecutor::ThreadingModel::kBorrowed;
+ setInitialThreadingModel(ServiceExecutor::ThreadingModel::kBorrowed);
} else {
MONGO_UNREACHABLE;
}
@@ -82,6 +82,10 @@ Status ServiceExecutor::setInitialThreadingModel(StringData value) noexcept {
return Status::OK();
}
+void ServiceExecutor::setInitialThreadingModel(ThreadingModel threadingModel) noexcept {
+ gInitialThreadingModel = threadingModel;
+}
+
auto ServiceExecutor::getInitialThreadingModel() noexcept -> ThreadingModel {
return gInitialThreadingModel;
}
@@ -266,5 +270,29 @@ void ServiceExecutor::yieldIfAppropriate() const {
}
}
+void ServiceExecutor::shutdownAll(ServiceContext* serviceContext, Date_t deadline) {
+ auto getTimeout = [&] {
+ auto now = serviceContext->getPreciseClockSource()->now();
+ return std::max(Milliseconds{0}, deadline - now);
+ };
+
+ if (auto status = transport::ServiceExecutorFixed::get(serviceContext)->shutdown(getTimeout());
+ !status.isOK()) {
+ LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status);
+ }
+
+ if (auto exec = transport::ServiceExecutorReserved::get(serviceContext)) {
+ if (auto status = exec->shutdown(getTimeout()); !status.isOK()) {
+ LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status);
+ }
+ }
+
+ if (auto status =
+ transport::ServiceExecutorSynchronous::get(serviceContext)->shutdown(getTimeout());
+ !status.isOK()) {
+ LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status);
+ }
+}
+
} // namespace transport
} // namespace mongo
diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h
index 169c753c853..478120f52bc 100644
--- a/src/mongo/transport/service_executor.h
+++ b/src/mongo/transport/service_executor.h
@@ -62,9 +62,12 @@ public:
friend StringData toString(ThreadingModel threadingModel);
- static Status setInitialThreadingModel(StringData value) noexcept;
+ static Status setInitialThreadingModelFromString(StringData value) noexcept;
+ static void setInitialThreadingModel(ThreadingModel threadingModel) noexcept;
static ThreadingModel getInitialThreadingModel() noexcept;
+ static void shutdownAll(ServiceContext* serviceContext, Date_t deadline);
+
virtual ~ServiceExecutor() = default;
using Task = unique_function<void()>;
enum ScheduleFlags {
diff --git a/src/mongo/transport/service_executor.idl b/src/mongo/transport/service_executor.idl
index d7235440de8..1758e45ee13 100644
--- a/src/mongo/transport/service_executor.idl
+++ b/src/mongo/transport/service_executor.idl
@@ -38,7 +38,7 @@ server_parameters:
set_at: [ startup ]
cpp_vartype: "std::string"
cpp_varname: "initialServiceExecutorThreadingModel"
- on_update: "ServiceExecutor::setInitialThreadingModel"
+ on_update: "ServiceExecutor::setInitialThreadingModelFromString"
default: "dedicated"
synchronousServiceExecutorRecursionLimit:
description: >-
diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp
index 36af3056231..1a9370c040e 100644
--- a/src/mongo/transport/service_executor_fixed.cpp
+++ b/src/mongo/transport/service_executor_fixed.cpp
@@ -442,6 +442,7 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session,
_stats.waitersEnded.fetchAndAdd(1);
}
+ waiter.session.reset();
waiter.onCompletionCallback(std::move(status));
});
}
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 1d5405a7867..df220d190bf 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -210,6 +210,10 @@ public:
_sep{_serviceContext->getServiceEntryPoint()},
_clientStrand{ClientStrand::make(std::move(client))} {}
+ ~Impl() {
+ _sep->onEndSession(session());
+ }
+
void start(ServiceExecutorContext seCtx);
void setCleanupHook(std::function<void()> hook);
@@ -297,7 +301,6 @@ private:
ServiceContext* const _serviceContext;
ServiceEntryPoint* const _sep;
- transport::SessionHandle _sessionHandle;
ClientStrandPtr _clientStrand;
std::function<void()> _cleanupHook;
@@ -674,15 +677,15 @@ void ServiceStateMachine::Impl::cleanupSession(const Status& status) {
transport::ServiceExecutorContext::reset(client);
}
- if (auto cleanupHook = std::exchange(_cleanupHook, {})) {
- cleanupHook();
- }
-
_state.store(State::Ended);
_inMessage.reset();
_outMessage.reset();
+
+ if (auto cleanupHook = std::exchange(_cleanupHook, {})) {
+ cleanupHook();
+ }
}
ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client)