diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-09 08:17:38 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-31 04:50:08 -0400 |
commit | 236c6c28a18210586673097ee436c5b613b6c46f (patch) | |
tree | 9c26586d5943845b8f3356cbbee41dc75533670d /src/mongo/db/session_catalog.cpp | |
parent | e701da7ff3ec84b2bb3b353fa748c22f7b2a5878 (diff) | |
download | mongo-236c6c28a18210586673097ee436c5b613b6c46f.tar.gz |
SERVER-37244 Make sessions killable outside of the Session/TransactionParticipant object
Diffstat (limited to 'src/mongo/db/session_catalog.cpp')
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 98 |
1 files changed, 46 insertions, 52 deletions
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index abc5165e5a3..70049de30a4 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -34,12 +34,8 @@ #include "mongo/db/session_catalog.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" -#include "mongo/db/transaction_participant.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -58,6 +54,7 @@ SessionCatalog::~SessionCatalog() { for (const auto& entry : _sessions) { auto& sri = entry.second; invariant(!sri->session.currentOperation()); + invariant(!sri->session.killed()); } } @@ -88,6 +85,31 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) stdx::unique_lock<stdx::mutex> ul(_mutex); auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); + // Wait until the session is no longer checked out and until the previously scheduled kill has + // completed + opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() { + return !sri->session.currentOperation() && !sri->session.killed(); + }); + + sri->session._markCheckedOut(ul, opCtx); + + return ScopedCheckedOutSession( + opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */); +} + +ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, + Session::KillToken killToken) { + // This method is not supposed to be called with an already checked-out session due to risk of + // deadlock + invariant(!operationSessionDecoration(opCtx)); + invariant(!opCtx->getTxnNumber()); + + const auto lsid = killToken.lsidToKill; + + stdx::unique_lock<stdx::mutex> ul(_mutex); + auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); + invariant(sri->session.killed()); + // Wait until the session is no longer checked out opCtx->waitForConditionOrInterrupt( sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); }); @@ -95,7 +117,8 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) invariant(!sri->session.currentOperation()); sri->session._markCheckedOut(ul, opCtx); - return ScopedCheckedOutSession(opCtx, ScopedSession(std::move(sri))); + return ScopedCheckedOutSession( + opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */); } ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, @@ -112,49 +135,6 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, return ss; } -void SessionCatalog::invalidateSessions(OperationContext* opCtx, - boost::optional<BSONObj> singleSessionDoc) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - if (isReplSet) { - uassert(40528, - str::stream() << "Direct writes against " - << NamespaceString::kSessionTransactionsTableNamespace.ns() - << " cannot be performed using a transaction or on a session.", - !opCtx->getLogicalSessionId()); - } - - const auto invalidateSessionFn = [&](WithLock, decltype(_sessions)::iterator it) { - auto& sri = it->second; - auto const txnParticipant = - TransactionParticipant::getFromNonCheckedOutSession(&sri->session); - txnParticipant->invalidate(); - - // We cannot remove checked-out sessions from the cache, because operations expect to find - // them there to check back in - if (!sri->session.currentOperation()) { - _sessions.erase(it); - } - }; - - stdx::lock_guard<stdx::mutex> lg(_mutex); - - if (singleSessionDoc) { - const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"), - singleSessionDoc->getField("_id").Obj()); - - auto it = _sessions.find(lsid); - if (it != _sessions.end()) { - invalidateSessionFn(lg, it); - } - } else { - auto it = _sessions.begin(); - while (it != _sessions.end()) { - invalidateSessionFn(lg, it++); - } - } -} - void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn) { stdx::lock_guard<stdx::mutex> lg(_mutex); @@ -163,15 +143,22 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, for (auto& sessionEntry : _sessions) { if (matcher.match(sessionEntry.first)) { - workerFn(&sessionEntry.second->session); + workerFn(lg, &sessionEntry.second->session); } } } +Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + auto it = _sessions.find(lsid); + uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end()); + + auto& sri = it->second; + return sri->session.kill(lg); +} + std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo( WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - auto it = _sessions.find(lsid); if (it == _sessions.end()) { it = _sessions.emplace(lsid, std::make_shared<SessionRuntimeInfo>(lsid)).first; @@ -180,7 +167,8 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate return it->second; } -void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { +void SessionCatalog::_releaseSession(const LogicalSessionId& lsid, + boost::optional<Session::KillToken> killToken) { stdx::lock_guard<stdx::mutex> lg(_mutex); auto it = _sessions.find(lsid); @@ -191,6 +179,11 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { sri->session._markCheckedIn(lg); sri->availableCondVar.notify_one(); + + if (killToken) { + invariant(sri->session.killed()); + sri->session._markNotKilled(lg, std::move(*killToken)); + } } OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession) @@ -236,6 +229,7 @@ OperationContextSession::~OperationContextSession() { // SessionCatalog mutex, and other code may take the Client lock while holding that mutex. stdx::unique_lock<Client> lk(*_opCtx->getClient()); ScopedCheckedOutSession sessionToDelete(std::move(checkedOutSession.get())); + // This destroys the moved-from ScopedCheckedOutSession, and must be done within the client // lock. checkedOutSession = boost::none; |