diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-04-13 20:14:37 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-13 20:53:37 +0000 |
commit | 87393ce9bcfe06f8aa93b856474fb77bfb3a5267 (patch) | |
tree | 2f8da9e5f696453e54e4479a653155bcafd72278 /src/mongo/db/session_catalog.cpp | |
parent | 8a6ccfc1aedfd0006fe4ec4fefd2e7f23abc5e8f (diff) | |
download | mongo-87393ce9bcfe06f8aa93b856474fb77bfb3a5267.tar.gz |
SERVER-62479 Reap sessions for the same retryable write atomically
Diffstat (limited to 'src/mongo/db/session_catalog.cpp')
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 351 |
1 files changed, 162 insertions, 189 deletions
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index f9f0648002d..eaf13b6a4fa 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -53,10 +53,10 @@ const auto operationSessionDecoration = SessionCatalog::~SessionCatalog() { stdx::lock_guard<Latch> lg(_mutex); - for (const auto& entry : _sessions) { - ObservableSession session(lg, entry.second->session); - invariant(!session.hasCurrentOperation()); - invariant(!session._killed()); + for (const auto& [_, sri] : _sessions) { + ObservableSession osession(lg, sri.get(), &sri->parentSession); + invariant(!osession.hasCurrentOperation()); + invariant(!osession._killed()); } } @@ -74,95 +74,39 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) { return &sessionTransactionTable; } -SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithParentSession( +SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionInner( OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional<KillToken> killToken) { if (killToken) { invariant(killToken->lsidToKill == lsid); - invariant(killToken->parentLsidToKill); - invariant(*killToken->parentLsidToKill == *getParentSessionId(lsid)); } else { invariant(opCtx->getLogicalSessionId() == lsid); } stdx::unique_lock<Latch> ul(_mutex); - auto parentSri = _getOrCreateSessionRuntimeInfo(ul, *getParentSessionId(lsid), nullptr); - auto childSri = _getOrCreateSessionRuntimeInfo(ul, lsid, parentSri); + auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid); + auto session = sri->getSession(lsid); + invariant(session); if (killToken) { - invariant(ObservableSession(ul, childSri->session)._killed()); - invariant(ObservableSession(ul, parentSri->session)._killed()); - } - - // Wait until the session is no longer checked out and until the previously scheduled kill has - // completed - ++parentSri->numWaitingToCheckOut; - ++childSri->numWaitingToCheckOut; - ON_BLOCK_EXIT([&] { - --parentSri->numWaitingToCheckOut; - --childSri->numWaitingToCheckOut; - }); - - // Wait on the parent session's condition variable since if the parent session is checked out - // prior to this, the child session's condition variable will not be notified when the parent - // session becomes available; on the other hand, if the child session is checked out prior to - // this, both parent session's and child session's condition variables will be notified when the - // child session and parent session become available. - opCtx->waitForConditionOrInterrupt( - parentSri->availableCondVar, - ul, - [&ul, &opCtx, &parentSri, &childSri, forKill = killToken.has_value()]() { - ObservableSession oParentSession(ul, parentSri->session); - ObservableSession oChildSession(ul, childSri->session); - auto isParentSessionAvailable = oParentSession._isAvailableForCheckOut(forKill); - auto isChildSessionAvailable = oChildSession._isAvailableForCheckOut(forKill); - if (isParentSessionAvailable) { - invariant(isChildSessionAvailable || oChildSession._killed()); - } - return isParentSessionAvailable && isChildSessionAvailable; - }); - - parentSri->session._childSessionCheckoutOpCtx = opCtx; - parentSri->session._lastCheckout = Date_t::now(); - - childSri->session._checkoutOpCtx = opCtx; - childSri->session._lastCheckout = Date_t::now(); - - return ScopedCheckedOutSession( - *this, std::move(childSri), std::move(parentSri), std::move(killToken)); -} - -SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithoutParentSession( - OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional<KillToken> killToken) { - if (killToken) { - invariant(killToken->lsidToKill == lsid); - invariant(!killToken->parentLsidToKill); - } else { - invariant(opCtx->getLogicalSessionId() == lsid); - } - - stdx::unique_lock<Latch> ul(_mutex); - - auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid, nullptr); - if (killToken) { - invariant(ObservableSession(ul, sri->session)._killed()); + invariant(ObservableSession(ul, sri, session)._killed()); } // Wait until the session is no longer checked out and until the previously scheduled kill has // completed. - ++sri->numWaitingToCheckOut; - ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; }); + ++session->_numWaitingToCheckOut; + ON_BLOCK_EXIT([&] { --session->_numWaitingToCheckOut; }); opCtx->waitForConditionOrInterrupt( - sri->availableCondVar, ul, [&ul, &sri, forKill = killToken.has_value()]() { - ObservableSession osession(ul, sri->session); + sri->availableCondVar, ul, [&ul, &sri, &session, forKill = killToken.has_value()]() { + ObservableSession osession(ul, sri, session); return osession._isAvailableForCheckOut(forKill); }); - sri->session._checkoutOpCtx = opCtx; - sri->session._lastCheckout = Date_t::now(); + sri->checkoutOpCtx = opCtx; + sri->lastCheckout = Date_t::now(); - return ScopedCheckedOutSession(*this, std::move(sri), nullptr, std::move(killToken)); + return ScopedCheckedOutSession(*this, std::move(sri), session, std::move(killToken)); } SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) { @@ -174,10 +118,7 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(Operati invariant(!opCtx->lockState()->isLocked()); auto lsid = *opCtx->getLogicalSessionId(); - if (getParentSessionId(lsid)) { - return _checkOutSessionWithParentSession(opCtx, lsid, boost::none /* killToken */); - } - return _checkOutSessionWithoutParentSession(opCtx, lsid, boost::none /* killToken */); + return _checkOutSessionInner(opCtx, lsid, boost::none /* killToken */); } SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, @@ -188,58 +129,101 @@ SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationCo invariant(!opCtx->getTxnNumber()); auto lsid = killToken.lsidToKill; - if (getParentSessionId(lsid)) { - return SessionToKill(_checkOutSessionWithParentSession(opCtx, lsid, std::move(killToken))); - } - return SessionToKill(_checkOutSessionWithoutParentSession(opCtx, lsid, std::move(killToken))); + return SessionToKill(_checkOutSessionInner(opCtx, lsid, std::move(killToken))); } void SessionCatalog::scanSession(const LogicalSessionId& lsid, const ScanSessionsCallbackFn& workerFn) { - std::unique_ptr<SessionRuntimeInfo> sessionToReap; + stdx::lock_guard<Latch> lg(_mutex); - { - stdx::lock_guard<Latch> lg(_mutex); - auto it = _sessions.find(lsid); - if (it != _sessions.end()) { - auto& sri = it->second; - ObservableSession osession(lg, sri->session); + if (auto sri = _getSessionRuntimeInfo(lg, lsid)) { + auto session = sri->getSession(lsid); + invariant(session); + + ObservableSession osession(lg, sri, session); + workerFn(osession); + invariant(!osession._markedForReap, "Cannot reap a session via 'scanSession'"); + } +} + +void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, + const ScanSessionsCallbackFn& workerFn) { + stdx::lock_guard<Latch> lg(_mutex); + + LOGV2_DEBUG(21976, + 2, + "Scanning {sessionCount} sessions", + "Scanning sessions", + "sessionCount"_attr = _sessions.size()); + + for (auto& [parentLsid, sri] : _sessions) { + if (matcher.match(parentLsid)) { + ObservableSession osession(lg, sri.get(), &sri->parentSession); workerFn(osession); + invariant(!osession._markedForReap, "Cannot reap a session via 'scanSessions'"); + } - if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) { - sessionToReap = std::move(sri); - _sessions.erase(it); + for (auto& [childLsid, session] : sri->childSessions) { + if (matcher.match(childLsid)) { + ObservableSession osession(lg, sri.get(), &session); + workerFn(osession); + invariant(!osession._markedForReap, "Cannot reap a session via 'scanSessions'"); } } } } -void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, - const ScanSessionsCallbackFn& workerFn) { - std::vector<std::unique_ptr<SessionRuntimeInfo>> sessionsToReap; +LogicalSessionIdSet SessionCatalog::scanSessionsForReap( + const LogicalSessionId& parentLsid, + const ScanSessionsCallbackFn& parentSessionWorkerFn, + const ScanSessionsCallbackFn& childSessionWorkerFn) { + invariant(!getParentSessionId(parentLsid)); + std::unique_ptr<SessionRuntimeInfo> sriToReap; { stdx::lock_guard<Latch> lg(_mutex); - LOGV2_DEBUG(21976, - 2, - "Scanning {sessionCount} sessions", - "Scanning sessions", - "sessionCount"_attr = _sessions.size()); + auto sriIt = _sessions.find(parentLsid); + // The reaper should never try to reap a non-existent session id. + invariant(sriIt != _sessions.end()); + auto sri = sriIt->second.get(); - for (auto it = _sessions.begin(); it != _sessions.end(); ++it) { - if (matcher.match(it->first)) { - auto& sri = it->second; - ObservableSession osession(lg, sri->session); + LogicalSessionIdSet remainingSessions; + bool shouldReapRemaining = true; - workerFn(osession); + { + ObservableSession osession(lg, sri, &sri->parentSession); + parentSessionWorkerFn(osession); - if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) { - sessionsToReap.emplace_back(std::move(sri)); - _sessions.erase(it++); + remainingSessions.insert(osession.getSessionId()); + shouldReapRemaining = osession._shouldBeReaped(); + } + + { + auto childSessionIt = sri->childSessions.begin(); + while (childSessionIt != sri->childSessions.end()) { + ObservableSession osession(lg, sri, &childSessionIt->second); + childSessionWorkerFn(osession); + + if (osession._shouldBeReaped() && + (osession._reapMode == ObservableSession::ReapMode::kExclusive)) { + sri->childSessions.erase(childSessionIt++); + continue; } + + remainingSessions.insert(osession.getSessionId()); + shouldReapRemaining &= osession._shouldBeReaped(); + ++childSessionIt; } } + + if (shouldReapRemaining) { + sriToReap = std::move(sriIt->second); + _sessions.erase(sriIt); + remainingSessions.clear(); + } + + return remainingSessions; } } @@ -248,7 +232,7 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls auto sri = _getSessionRuntimeInfo(lg, lsid); uassert(ErrorCodes::NoSuchSession, "Session not found", sri); - return ObservableSession(lg, sri->session).kill(); + return ObservableSession(lg, sri, &sri->parentSession).kill(); } size_t SessionCatalog::size() const { @@ -258,133 +242,122 @@ size_t SessionCatalog::size() const { void SessionCatalog::createSessionIfDoesNotExist(const LogicalSessionId& lsid) { stdx::lock_guard<Latch> lg(_mutex); - auto parentSri = [&]() -> SessionRuntimeInfo* { - if (auto parentLsid = getParentSessionId(lsid)) { - return _getOrCreateSessionRuntimeInfo(lg, *parentLsid, nullptr); - } - return nullptr; - }(); - _getOrCreateSessionRuntimeInfo(lg, lsid, parentSri); + _getOrCreateSessionRuntimeInfo(lg, lsid); } SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo( WithLock, const LogicalSessionId& lsid) { - auto it = _sessions.find(lsid); - if (it == _sessions.end()) { + auto parentLsid = castToParentSessionId(lsid); + auto sriIt = _sessions.find(parentLsid); + + if (sriIt == _sessions.end()) { return nullptr; } - return it->second.get(); + + auto sri = sriIt->second.get(); + auto session = sri->getSession(lsid); + + if (session) { + return sri; + } + + return nullptr; } SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo( - WithLock lk, const LogicalSessionId& lsid, SessionRuntimeInfo* parentSri) { + WithLock lk, const LogicalSessionId& lsid) { if (auto sri = _getSessionRuntimeInfo(lk, lsid)) { return sri; } - auto it = _sessions.emplace(lsid, std::make_unique<SessionRuntimeInfo>(lsid, parentSri)).first; - return it->second.get(); + auto parentLsid = castToParentSessionId(lsid); + auto sriIt = + _sessions.emplace(parentLsid, std::make_unique<SessionRuntimeInfo>(parentLsid)).first; + auto sri = sriIt->second.get(); + + if (getParentSessionId(lsid)) { + auto [childSessionIt, inserted] = sri->childSessions.try_emplace(lsid, lsid); + // Insert should always succeed since the session did not exist prior to this. + invariant(inserted); + + auto& childSession = childSessionIt->second; + childSession._parentSession = &sri->parentSession; + } + + return sri; } void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, - SessionRuntimeInfo* parentSri, + Session* session, boost::optional<KillToken> killToken) { stdx::lock_guard<Latch> lg(_mutex); // Make sure we have exactly the same session on the map and that it is still associated with an // operation context (meaning checked-out) - invariant(_sessions[sri->session.getSessionId()].get() == sri); - invariant(sri->session._checkoutOpCtx); + invariant(_sessions[sri->parentSession.getSessionId()].get() == sri); + invariant(sri->checkoutOpCtx); if (killToken) { - invariant(killToken->lsidToKill == sri->session.getSessionId()); - - if (parentSri) { - invariant(killToken->parentLsidToKill == parentSri->session.getSessionId()); - } else { - invariant(!killToken->parentLsidToKill); - } + invariant(killToken->lsidToKill == session->getSessionId()); } - auto parentLsid = getParentSessionId(sri->session.getSessionId()); - if (parentSri) { - invariant(parentLsid); - invariant(parentSri->session._childSessionCheckoutOpCtx == sri->session._checkoutOpCtx); - parentSri->session._childSessionCheckoutOpCtx = nullptr; - parentSri->availableCondVar.notify_all(); - } else { - invariant(!parentLsid); - } - sri->session._checkoutOpCtx = nullptr; + sri->checkoutOpCtx = nullptr; sri->availableCondVar.notify_all(); if (killToken) { - invariant(sri->session._killsRequested > 0); - --sri->session._killsRequested; - - if (parentSri) { - invariant(killToken->parentLsidToKill); - invariant(parentSri->session._killsRequested > 0); - --parentSri->session._killsRequested; - } + invariant(sri->killsRequested > 0); + --sri->killsRequested; } } -SessionCatalog::SessionRuntimeInfo::~SessionRuntimeInfo() { - invariant(!numWaitingToCheckOut); +Session* SessionCatalog::SessionRuntimeInfo::getSession(const LogicalSessionId& lsid) { + if (lsid == parentSession._sessionId) { + return &parentSession; + } + + invariant(getParentSessionId(lsid) == parentSession._sessionId); + auto it = childSessions.find(lsid); + if (it == childSessions.end()) { + return nullptr; + } + return &it->second; } SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const { - const bool firstKiller = (0 == _session->_killsRequested); - ++_session->_killsRequested; + const bool firstKiller = (0 == _sri->killsRequested); + ++_sri->killsRequested; // For currently checked-out sessions, interrupt the operation context so that the current owner // can release the session if (firstKiller && hasCurrentOperation()) { - if (_session->_checkoutOpCtx) { - invariant(_clientLock.owns_lock()); - invariant(!_session->_childSessionCheckoutOpCtx); - const auto serviceContext = _session->_checkoutOpCtx->getServiceContext(); - serviceContext->killOperation(_clientLock, _session->_checkoutOpCtx, reason); - } else if (_session->_childSessionCheckoutOpCtx) { - // Both parent and child sessions can't be checked out at the same time, so _clientLock - // should be empty, and we'll never take the child operation context's client lock while - // already holding the parent's. - invariant(!_clientLock.owns_lock()); - stdx::unique_lock<Client> childOpClientLock{ - *_session->_childSessionCheckoutOpCtx->getClient()}; - const auto serviceContext = _session->_childSessionCheckoutOpCtx->getServiceContext(); - serviceContext->killOperation( - childOpClientLock, _session->_childSessionCheckoutOpCtx, reason); - } - } - - auto parentSession = _session->_parentSession; - if (parentSession) { - const bool firstParentKiller = (0 == parentSession->_killsRequested); - ++parentSession->_killsRequested; - - if (firstParentKiller && parentSession->_checkoutOpCtx) { - // Both parent and child sessions can't be checked out at the same time, so _clientLock - // should be empty, and we'll never take the parent operation context's client lock - // while already holding the child's. - invariant(!_clientLock.owns_lock()); - stdx::unique_lock<Client> clientLock{*parentSession->_checkoutOpCtx->getClient()}; - const auto serviceContext = parentSession->_checkoutOpCtx->getServiceContext(); - serviceContext->killOperation(clientLock, parentSession->_checkoutOpCtx, reason); - } + invariant(_clientLock.owns_lock()); + const auto serviceContext = _sri->checkoutOpCtx->getServiceContext(); + serviceContext->killOperation(_clientLock, _sri->checkoutOpCtx, reason); } - return SessionCatalog::KillToken( - getSessionId(), - parentSession ? boost::make_optional(parentSession->getSessionId()) : boost::none); + return SessionCatalog::KillToken(getSessionId()); } -void ObservableSession::markForReap() { +void ObservableSession::markForReap(ReapMode reapMode) { + if (!getParentSessionId(getSessionId())) { + // By design, parent sessions are only safe to be reaped if all of their child sessions are. + invariant(reapMode == ReapMode::kNonExclusive); + } _markedForReap = true; + _reapMode.emplace(reapMode); +} + +bool ObservableSession::_shouldBeReaped() const { + bool isCheckedOut = [&] { + if (_sri->checkoutOpCtx) { + return _sri->checkoutOpCtx->getLogicalSessionId() == getSessionId(); + } + return false; + }(); + return _markedForReap && !isCheckedOut && !get()->_numWaitingToCheckOut && !_killed(); } bool ObservableSession::_killed() const { - return _session->_killsRequested > 0; + return _sri->killsRequested > 0; } OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opCtx(opCtx) { |