From 236c6c28a18210586673097ee436c5b613b6c46f Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Tue, 9 Oct 2018 08:17:38 -0400 Subject: SERVER-37244 Make sessions killable outside of the Session/TransactionParticipant object --- src/mongo/db/session_catalog.cpp | 98 +++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 52 deletions(-) (limited to 'src/mongo/db/session_catalog.cpp') diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index abc5165e5a3..70049de30a4 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -34,12 +34,8 @@ #include "mongo/db/session_catalog.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" -#include "mongo/db/transaction_participant.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -58,6 +54,7 @@ SessionCatalog::~SessionCatalog() { for (const auto& entry : _sessions) { auto& sri = entry.second; invariant(!sri->session.currentOperation()); + invariant(!sri->session.killed()); } } @@ -88,6 +85,31 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) stdx::unique_lock ul(_mutex); auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); + // Wait until the session is no longer checked out and until the previously scheduled kill has + // completed + opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() { + return !sri->session.currentOperation() && !sri->session.killed(); + }); + + sri->session._markCheckedOut(ul, opCtx); + + return ScopedCheckedOutSession( + opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */); +} + +ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, + Session::KillToken killToken) { + // This method is not supposed to be called with an already checked-out session due to risk of + // deadlock + invariant(!operationSessionDecoration(opCtx)); + invariant(!opCtx->getTxnNumber()); + + const auto lsid = killToken.lsidToKill; + + stdx::unique_lock ul(_mutex); + auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); + invariant(sri->session.killed()); + // Wait until the session is no longer checked out opCtx->waitForConditionOrInterrupt( sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); }); @@ -95,7 +117,8 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) invariant(!sri->session.currentOperation()); sri->session._markCheckedOut(ul, opCtx); - return ScopedCheckedOutSession(opCtx, ScopedSession(std::move(sri))); + return ScopedCheckedOutSession( + opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */); } ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, @@ -112,49 +135,6 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, return ss; } -void SessionCatalog::invalidateSessions(OperationContext* opCtx, - boost::optional singleSessionDoc) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - if (isReplSet) { - uassert(40528, - str::stream() << "Direct writes against " - << NamespaceString::kSessionTransactionsTableNamespace.ns() - << " cannot be performed using a transaction or on a session.", - !opCtx->getLogicalSessionId()); - } - - const auto invalidateSessionFn = [&](WithLock, decltype(_sessions)::iterator it) { - auto& sri = it->second; - auto const txnParticipant = - TransactionParticipant::getFromNonCheckedOutSession(&sri->session); - txnParticipant->invalidate(); - - // We cannot remove checked-out sessions from the cache, because operations expect to find - // them there to check back in - if (!sri->session.currentOperation()) { - _sessions.erase(it); - } - }; - - stdx::lock_guard lg(_mutex); - - if (singleSessionDoc) { - const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"), - singleSessionDoc->getField("_id").Obj()); - - auto it = _sessions.find(lsid); - if (it != _sessions.end()) { - invalidateSessionFn(lg, it); - } - } else { - auto it = _sessions.begin(); - while (it != _sessions.end()) { - invalidateSessionFn(lg, it++); - } - } -} - void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn) { stdx::lock_guard lg(_mutex); @@ -163,15 +143,22 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, for (auto& sessionEntry : _sessions) { if (matcher.match(sessionEntry.first)) { - workerFn(&sessionEntry.second->session); + workerFn(lg, &sessionEntry.second->session); } } } +Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) { + stdx::lock_guard lg(_mutex); + auto it = _sessions.find(lsid); + uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end()); + + auto& sri = it->second; + return sri->session.kill(lg); +} + std::shared_ptr SessionCatalog::_getOrCreateSessionRuntimeInfo( WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - auto it = _sessions.find(lsid); if (it == _sessions.end()) { it = _sessions.emplace(lsid, std::make_shared(lsid)).first; @@ -180,7 +167,8 @@ std::shared_ptr SessionCatalog::_getOrCreate return it->second; } -void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { +void SessionCatalog::_releaseSession(const LogicalSessionId& lsid, + boost::optional killToken) { stdx::lock_guard lg(_mutex); auto it = _sessions.find(lsid); @@ -191,6 +179,11 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { sri->session._markCheckedIn(lg); sri->availableCondVar.notify_one(); + + if (killToken) { + invariant(sri->session.killed()); + sri->session._markNotKilled(lg, std::move(*killToken)); + } } OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession) @@ -236,6 +229,7 @@ OperationContextSession::~OperationContextSession() { // SessionCatalog mutex, and other code may take the Client lock while holding that mutex. stdx::unique_lock lk(*_opCtx->getClient()); ScopedCheckedOutSession sessionToDelete(std::move(checkedOutSession.get())); + // This destroys the moved-from ScopedCheckedOutSession, and must be done within the client // lock. checkedOutSession = boost::none; -- cgit v1.2.1