diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2020-12-15 22:40:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-06 20:58:51 +0000 |
commit | 6020123ae6d42a31a9564ab96ab4979d1acd4f5a (patch) | |
tree | d679f44e4fbfd88edba5e669a9cbc48e3f8c2933 /src/mongo/transport | |
parent | 0615cd112f6cbe12ad6aab52319903a954158da5 (diff) | |
download | mongo-6020123ae6d42a31a9564ab96ab4979d1acd4f5a.tar.gz |
SERVER-53421 Provide ways to track Session cleanup
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 7 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 107 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.h | 10 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.cpp | 34 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 5 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.idl | 2 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_fixed.cpp | 1 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 13 |
8 files changed, 109 insertions, 70 deletions
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index fb1ad895d78..3e8d279b7b9 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -95,6 +95,13 @@ public: virtual Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& request) noexcept = 0; + /** + * Optional handler which is invoked after a session ends. + * + * This function implies that the Session itself will soon be destructed. + */ + virtual void onEndSession(const transport::SessionHandle&) {} + protected: ServiceEntryPoint() = default; }; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 095940021da..111dfd4bbd3 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -164,26 +164,24 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto clientName = "conn{}"_format(session->id()); auto client = _svcCtx->makeClient(clientName, session); - auto ssm = std::make_shared<transport::ServiceStateMachine>(std::move(client)); - const bool quiet = serverGlobalParams.quiet.load(); size_t connectionCount; - auto ssmIt = [&]() -> boost::optional<SSMListIterator> { + auto maybeSsmIt = [&]() -> boost::optional<SSMListIterator> { stdx::lock_guard lk(_sessionsMutex); connectionCount = _currentConnections.load(); if (connectionCount > _maxNumConnections && !canOverrideMaxConns) { return boost::none; } - auto it = _sessions.emplace(_sessions.begin(), ssm); + auto it = _sessions.emplace(_sessions.begin(), std::move(client)); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); _createdConnections.addAndFetch(1); return it; }(); - if (!ssmIt) { + if (!maybeSsmIt) { if (!quiet) { LOGV2(22942, "Connection refused because there are too many open connections", @@ -199,16 +197,16 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { "connectionCount"_attr = connectionCount); } - ssm->setCleanupHook([this, ssmIt, quiet, session = std::move(session)] { + auto ssmIt = *maybeSsmIt; + ssmIt->setCleanupHook([this, ssmIt, quiet, session = std::move(session)] { size_t connectionCount; auto remote = session->remote(); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(*ssmIt); + _sessions.erase(ssmIt); connectionCount = _sessions.size(); _currentConnections.store(connectionCount); } - _shutdownCondition.notify_one(); if (!quiet) { LOGV2(22944, @@ -217,12 +215,14 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { "connectionId"_attr = session->id(), "connectionCount"_attr = connectionCount); } + + _sessionsCV.notify_one(); }); auto seCtx = transport::ServiceExecutorContext{}; seCtx.setThreadingModel(transport::ServiceExecutor::getInitialThreadingModel()); seCtx.setCanUseReserved(canOverrideMaxConns); - ssm->start(std::move(seCtx)); + ssmIt->start(std::move(seCtx)); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { @@ -231,7 +231,7 @@ void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { { stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); for (auto& ssm : _sessions) { - ssm->terminateIfTagsDontMatch(tags); + ssm.terminateIfTagsDontMatch(tags); } } } @@ -241,36 +241,26 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { // When running under address sanitizer, we get false positive leaks due to disorder around // the lifecycle of a connection and request. When we are running under ASAN, we try a lot // harder to dry up the server from active connections before going on to really shut down. + return shutdownAndWait(timeout); +#else + return true; +#endif +} - using logv2::LogComponent; +bool ServiceEntryPointImpl::shutdownAndWait(Milliseconds timeout) { + auto deadline = _svcCtx->getPreciseClockSource()->now() + timeout; - auto start = _svcCtx->getPreciseClockSource()->now(); stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); // Request that all sessions end, while holding the _sesionsMutex, loop over all the current - // connections and terminate them - for (auto& ssm : _sessions) { - ssm->terminate(); - } - - // Close all sockets and then wait for the number of active connections to reach zero with a - // condition_variable that notifies in the session cleanup hook. If we haven't closed drained - // all active operations within the deadline, just keep going with shutdown: the OS will do it - // for us when the process terminates. - auto timeSpent = Milliseconds(0); - const auto checkInterval = std::min(Milliseconds(250), timeout); - - auto noWorkersLeft = [this] { return numOpenSessions() == 0; }; - while (timeSpent < timeout && - !_shutdownCondition.wait_for(lk, checkInterval.toSystemDuration(), noWorkersLeft)) { - LOGV2(22945, - "shutdown: still waiting on {workers} active workers to drain... ", - "shutdown: still waiting on active workers to drain... ", - "workers"_attr = numOpenSessions()); - timeSpent += checkInterval; - } + // connections and terminate them. Then wait for the number of active connections to reach zero + // with a condition_variable that notifies in the session cleanup hook. If we haven't closed + // drained all active operations within the deadline, just keep going with shutdown: the OS will + // do it for us when the process terminates. + _terminateAll(lk); + auto result = _waitForNoSessions(lk, deadline); + lk.unlock(); - bool result = noWorkersLeft(); if (result) { LOGV2(22946, "shutdown: no running workers found..."); } else { @@ -282,35 +272,36 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { "workers"_attr = numOpenSessions()); } - lk.unlock(); + transport::ServiceExecutor::shutdownAll(_svcCtx, deadline); - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto status = transport::ServiceExecutorFixed::get(_svcCtx)->shutdown(timeout); - !status.isOK()) { - LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); - } + return result; +} - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto exec = transport::ServiceExecutorReserved::get(_svcCtx)) { - if (auto status = exec->shutdown(timeout); !status.isOK()) { - LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); - } - } +void ServiceEntryPointImpl::endAllSessionsNoTagMask() { + auto lk = stdx::unique_lock<decltype(_sessionsMutex)>(_sessionsMutex); + _terminateAll(lk); +} - timeSpent = _svcCtx->getPreciseClockSource()->now() - start; - timeout = std::max(Milliseconds{0}, timeout - timeSpent); - if (auto status = - transport::ServiceExecutorSynchronous::get(_svcCtx)->shutdown(timeout - timeSpent); - !status.isOK()) { - LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); +void ServiceEntryPointImpl::_terminateAll(WithLock) { + for (auto& ssm : _sessions) { + ssm.terminate(); } +} - return result; -#else - return true; -#endif +bool ServiceEntryPointImpl::waitForNoSessions(Milliseconds timeout) { + auto deadline = _svcCtx->getPreciseClockSource()->now() + timeout; + LOGV2(5342100, "Waiting until for all sessions to conclude", "deadline"_attr = deadline); + + auto lk = stdx::unique_lock<decltype(_sessionsMutex)>(_sessionsMutex); + return _waitForNoSessions(lk, deadline); +} + +bool ServiceEntryPointImpl::_waitForNoSessions(stdx::unique_lock<decltype(_sessionsMutex)>& lk, + Date_t deadline) { + auto noWorkersLeft = [this] { return numOpenSessions() == 0; }; + _sessionsCV.wait_until(lk, deadline.toSystemTimePoint(), noWorkersLeft); + + return noWorkersLeft(); } void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 1693469fcda..a8c1b080c7c 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -67,9 +67,12 @@ public: void startSession(transport::SessionHandle session) override; void endAllSessions(transport::Session::TagMask tags) final; + void endAllSessionsNoTagMask(); Status start() final; bool shutdown(Milliseconds timeout) final; + bool shutdownAndWait(Milliseconds timeout); + bool waitForNoSessions(Milliseconds timeout); void appendStats(BSONObjBuilder* bob) const final; @@ -82,7 +85,10 @@ public: } private: - using SSMList = std::list<std::shared_ptr<transport::ServiceStateMachine>>; + void _terminateAll(WithLock); + bool _waitForNoSessions(stdx::unique_lock<Mutex>& lk, Date_t deadline); + + using SSMList = std::list<transport::ServiceStateMachine>; using SSMListIterator = SSMList::iterator; ServiceContext* const _svcCtx; @@ -90,7 +96,7 @@ private: mutable Mutex _sessionsMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "ServiceEntryPointImpl::_sessionsMutex"); - stdx::condition_variable _shutdownCondition; + stdx::condition_variable _sessionsCV; SSMList _sessions; const size_t _maxNumConnections{DEFAULT_MAX_CONN}; diff --git a/src/mongo/transport/service_executor.cpp b/src/mongo/transport/service_executor.cpp index 18efe4c1013..4ec30e50609 100644 --- a/src/mongo/transport/service_executor.cpp +++ b/src/mongo/transport/service_executor.cpp @@ -70,11 +70,11 @@ StringData toString(ServiceExecutor::ThreadingModel threadingModel) { } } -Status ServiceExecutor::setInitialThreadingModel(StringData value) noexcept { +Status ServiceExecutor::setInitialThreadingModelFromString(StringData value) noexcept { if (value == kThreadingModelDedicatedStr) { - gInitialThreadingModel = ServiceExecutor::ThreadingModel::kDedicated; + setInitialThreadingModel(ServiceExecutor::ThreadingModel::kDedicated); } else if (value == kThreadingModelBorrowedStr) { - gInitialThreadingModel = ServiceExecutor::ThreadingModel::kBorrowed; + setInitialThreadingModel(ServiceExecutor::ThreadingModel::kBorrowed); } else { MONGO_UNREACHABLE; } @@ -82,6 +82,10 @@ Status ServiceExecutor::setInitialThreadingModel(StringData value) noexcept { return Status::OK(); } +void ServiceExecutor::setInitialThreadingModel(ThreadingModel threadingModel) noexcept { + gInitialThreadingModel = threadingModel; +} + auto ServiceExecutor::getInitialThreadingModel() noexcept -> ThreadingModel { return gInitialThreadingModel; } @@ -266,5 +270,29 @@ void ServiceExecutor::yieldIfAppropriate() const { } } +void ServiceExecutor::shutdownAll(ServiceContext* serviceContext, Date_t deadline) { + auto getTimeout = [&] { + auto now = serviceContext->getPreciseClockSource()->now(); + return std::max(Milliseconds{0}, deadline - now); + }; + + if (auto status = transport::ServiceExecutorFixed::get(serviceContext)->shutdown(getTimeout()); + !status.isOK()) { + LOGV2(4907202, "Failed to shutdown ServiceExecutorFixed", "error"_attr = status); + } + + if (auto exec = transport::ServiceExecutorReserved::get(serviceContext)) { + if (auto status = exec->shutdown(getTimeout()); !status.isOK()) { + LOGV2(4907201, "Failed to shutdown ServiceExecutorReserved", "error"_attr = status); + } + } + + if (auto status = + transport::ServiceExecutorSynchronous::get(serviceContext)->shutdown(getTimeout()); + !status.isOK()) { + LOGV2(4907200, "Failed to shutdown ServiceExecutorSynchronous", "error"_attr = status); + } +} + } // namespace transport } // namespace mongo diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 169c753c853..478120f52bc 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -62,9 +62,12 @@ public: friend StringData toString(ThreadingModel threadingModel); - static Status setInitialThreadingModel(StringData value) noexcept; + static Status setInitialThreadingModelFromString(StringData value) noexcept; + static void setInitialThreadingModel(ThreadingModel threadingModel) noexcept; static ThreadingModel getInitialThreadingModel() noexcept; + static void shutdownAll(ServiceContext* serviceContext, Date_t deadline); + virtual ~ServiceExecutor() = default; using Task = unique_function<void()>; enum ScheduleFlags { diff --git a/src/mongo/transport/service_executor.idl b/src/mongo/transport/service_executor.idl index d7235440de8..1758e45ee13 100644 --- a/src/mongo/transport/service_executor.idl +++ b/src/mongo/transport/service_executor.idl @@ -38,7 +38,7 @@ server_parameters: set_at: [ startup ] cpp_vartype: "std::string" cpp_varname: "initialServiceExecutorThreadingModel" - on_update: "ServiceExecutor::setInitialThreadingModel" + on_update: "ServiceExecutor::setInitialThreadingModelFromString" default: "dedicated" synchronousServiceExecutorRecursionLimit: description: >- diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp index 36af3056231..1a9370c040e 100644 --- a/src/mongo/transport/service_executor_fixed.cpp +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -442,6 +442,7 @@ void ServiceExecutorFixed::runOnDataAvailable(const SessionHandle& session, _stats.waitersEnded.fetchAndAdd(1); } + waiter.session.reset(); waiter.onCompletionCallback(std::move(status)); }); } diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 1d5405a7867..df220d190bf 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -210,6 +210,10 @@ public: _sep{_serviceContext->getServiceEntryPoint()}, _clientStrand{ClientStrand::make(std::move(client))} {} + ~Impl() { + _sep->onEndSession(session()); + } + void start(ServiceExecutorContext seCtx); void setCleanupHook(std::function<void()> hook); @@ -297,7 +301,6 @@ private: ServiceContext* const _serviceContext; ServiceEntryPoint* const _sep; - transport::SessionHandle _sessionHandle; ClientStrandPtr _clientStrand; std::function<void()> _cleanupHook; @@ -674,15 +677,15 @@ void ServiceStateMachine::Impl::cleanupSession(const Status& status) { transport::ServiceExecutorContext::reset(client); } - if (auto cleanupHook = std::exchange(_cleanupHook, {})) { - cleanupHook(); - } - _state.store(State::Ended); _inMessage.reset(); _outMessage.reset(); + + if (auto cleanupHook = std::exchange(_cleanupHook, {})) { + cleanupHook(); + } } ServiceStateMachine::ServiceStateMachine(ServiceContext::UniqueClient client) |