From 8e96ecbf56a0763f4300aa38746ac86a8335d264 Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Tue, 31 May 2022 18:51:03 +0000 Subject: SERVER-66852 Eagerly erase retryable child sessions from SessionCatalog --- src/mongo/db/logical_session_id_helpers.cpp | 5 + src/mongo/db/logical_session_id_helpers.h | 6 + src/mongo/db/session_catalog.cpp | 68 +++- src/mongo/db/session_catalog.h | 30 +- src/mongo/db/transaction_participant.cpp | 8 + src/mongo/db/transaction_participant_test.cpp | 444 ++++++++++++++++++++++++++ src/mongo/s/transaction_router.cpp | 55 ++-- src/mongo/s/transaction_router_test.cpp | 87 +++++ 8 files changed, 667 insertions(+), 36 deletions(-) diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp index 035b54b061d..c780813e68a 100644 --- a/src/mongo/db/logical_session_id_helpers.cpp +++ b/src/mongo/db/logical_session_id_helpers.cpp @@ -84,6 +84,11 @@ SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db) { return SHA256Block::computeHash({ConstDataRange(fn.c_str(), fn.size())}); } +bool isParentSessionId(const LogicalSessionId& sessionId) { + // All child sessions must have a txnUUID. + return !sessionId.getTxnUUID(); +} + boost::optional getParentSessionId(const LogicalSessionId& sessionId) { if (sessionId.getTxnUUID()) { return LogicalSessionId{sessionId.getId(), sessionId.getUid()}; diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h index 631ca0d979a..3d692c701e5 100644 --- a/src/mongo/db/logical_session_id_helpers.h +++ b/src/mongo/db/logical_session_id_helpers.h @@ -49,6 +49,12 @@ SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* o */ SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db); +/** + * Returns if the given session is a parent session, ie only has fields that could have come from an + * external client. + */ +bool isParentSessionId(const LogicalSessionId& sessionId); + /** * Returns the parent session id for the given session id if there is one. */ diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index a0c97676370..7760427aa9a 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -89,7 +89,7 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionInner( stdx::unique_lock ul(_mutex); auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid); - auto session = sri->getSession(lsid); + auto session = sri->getSession(ul, lsid); invariant(session); if (killToken) { @@ -148,7 +148,7 @@ void SessionCatalog::scanSession(const LogicalSessionId& lsid, stdx::lock_guard lg(_mutex); if (auto sri = _getSessionRuntimeInfo(lg, lsid)) { - auto session = sri->getSession(lsid); + auto session = sri->getSession(lg, lsid); invariant(session); ObservableSession osession(lg, sri, session); @@ -259,7 +259,7 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls auto sri = _getSessionRuntimeInfo(lg, lsid); uassert(ErrorCodes::NoSuchSession, "Session not found", sri); - auto session = sri->getSession(lsid); + auto session = sri->getSession(lg, lsid); uassert(ErrorCodes::NoSuchSession, "Session not found", session); return ObservableSession(lg, sri, session).kill(); } @@ -275,7 +275,7 @@ void SessionCatalog::createSessionIfDoesNotExist(const LogicalSessionId& lsid) { } SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo( - WithLock, const LogicalSessionId& lsid) { + WithLock wl, const LogicalSessionId& lsid) { auto parentLsid = castToParentSessionId(lsid); auto sriIt = _sessions.find(parentLsid); @@ -284,7 +284,7 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo( } auto sri = sriIt->second.get(); - auto session = sri->getSession(lsid); + auto session = sri->getSession(wl, lsid); if (session) { return sri; @@ -322,7 +322,8 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeIn void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, Session* session, - boost::optional killToken) { + boost::optional killToken, + boost::optional clientTxnNumberStarted) { stdx::lock_guard lg(_mutex); // Make sure we have exactly the same session on the map and that it is still associated with an @@ -341,9 +342,30 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, invariant(sri->killsRequested > 0); --sri->killsRequested; } + + if (clientTxnNumberStarted.has_value()) { + // Since the given txnNumber successfully started, we know any child sessions with older + // txnNumbers can be discarded. This needed to wait until a transaction started because that + // can fail, e.g. if the active transaction is prepared. + auto numReaped = stdx::erase_if(sri->childSessions, [&](auto&& it) { + ObservableSession osession(lg, sri, &it.second); + if (it.first.getTxnNumber() && *it.first.getTxnNumber() < *clientTxnNumberStarted) { + osession.markForReap(ObservableSession::ReapMode::kExclusive); + } + return osession._shouldBeReaped(); + }); + + LOGV2_DEBUG(6685200, + 4, + "Erased child sessions", + "releasedLsid"_attr = session->getSessionId(), + "clientTxnNumber"_attr = *clientTxnNumberStarted, + "childSessionsRemaining"_attr = sri->childSessions.size(), + "numReaped"_attr = numReaped); + } } -Session* SessionCatalog::SessionRuntimeInfo::getSession(const LogicalSessionId& lsid) { +Session* SessionCatalog::SessionRuntimeInfo::getSession(WithLock, const LogicalSessionId& lsid) { if (lsid == parentSession._sessionId) { return &parentSession; } @@ -488,4 +510,36 @@ void OperationContextSession::checkOut(OperationContext* opCtx) { checkedOutSession.emplace(std::move(scopedCheckedOutSession)); } +void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { + auto& checkedOutSession = operationSessionDecoration(opCtx); + invariant(checkedOutSession); + + LOGV2_DEBUG(6685201, + 4, + "Observing new retryable write number started on session", + "lsid"_attr = lsid, + "txnNumber"_attr = txnNumber); + + const auto& checkedOutLsid = (*checkedOutSession)->getSessionId(); + if (isParentSessionId(lsid)) { + // Observing a new transaction/retryable write on a parent session. + + // The operation must have checked out the parent session itself or a child session of the + // parent. This is safe because both share the same SessionRuntimeInfo. + dassert(lsid == checkedOutLsid || lsid == *getParentSessionId(checkedOutLsid)); + + checkedOutSession->observeNewClientTxnNumberStarted(txnNumber); + } else if (isInternalSessionForRetryableWrite(lsid)) { + // Observing a new internal transaction on a retryable session. + + // A transaction on a child session is always begun on an operation that checked it out + // directly. + dassert(lsid == checkedOutLsid); + + checkedOutSession->observeNewClientTxnNumberStarted(*lsid.getTxnNumber()); + } +} + } // namespace mongo diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 937889c82cc..8186e2fcc60 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -152,7 +152,7 @@ private: invariant(!getParentSessionId(lsid)); } - Session* getSession(const LogicalSessionId& lsid); + Session* getSession(WithLock, const LogicalSessionId& lsid); // Must only be accessed by the OperationContext which currently has this logical session // checked out. @@ -207,11 +207,13 @@ private: SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock lk, const LogicalSessionId& lsid); /** - * Makes a session, previously checked out through 'checkoutSession', available again. + * Makes a session, previously checked out through 'checkoutSession', available again. Will free + * any retryable sessions with txnNumbers before clientTxnNumberStarted if it is set. */ void _releaseSession(SessionRuntimeInfo* sri, Session* session, - boost::optional killToken); + boost::optional killToken, + boost::optional clientTxnNumberStarted); // Protects the state below mutable Mutex _mutex = @@ -239,6 +241,7 @@ public: ScopedCheckedOutSession(ScopedCheckedOutSession&& other) : _catalog(other._catalog), + _clientTxnNumberStarted(other._clientTxnNumberStarted), _sri(other._sri), _session(other._session), _killToken(std::move(other._killToken)) { @@ -251,7 +254,8 @@ public: ~ScopedCheckedOutSession() { if (_sri) { - _catalog._releaseSession(_sri, _session, std::move(_killToken)); + _catalog._releaseSession( + _sri, _session, std::move(_killToken), _clientTxnNumberStarted); } } @@ -275,10 +279,20 @@ public: return bool(_killToken); } + void observeNewClientTxnNumberStarted(TxnNumber txnNumber) { + _clientTxnNumberStarted = txnNumber; + } + private: // The owning session catalog into which the session should be checked back SessionCatalog& _catalog; + // If this session began a retryable write or transaction while checked out, this is set to the + // "client txnNumber" of that transaction, which is the top-level txnNumber for a retryable + // write or transaction sent by a client or the txnNumber in the sessionId for a retryable + // child transaction. + boost::optional _clientTxnNumberStarted; + SessionCatalog::SessionRuntimeInfo* _sri; Session* _session; boost::optional _killToken; @@ -489,6 +503,14 @@ public: static void checkIn(OperationContext* opCtx, CheckInReason reason); static void checkOut(OperationContext* opCtx); + /** + * Notifies the session catalog when a new transaction/retryable write is begun on the operation + * context's checked out session. + */ + static void observeNewTxnNumberStarted(OperationContext* opCtx, + const LogicalSessionId& lsid, + TxnNumber txnNumber); + private: OperationContext* const _opCtx; }; diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 6944763d31b..0a8495b859c 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -2733,6 +2733,14 @@ void TransactionParticipant::Participant::_setNewTxnNumberAndRetryCounter( // Reset the transactional state _resetTransactionStateAndUnlock(&lk, TransactionState::kNone); + + invariant(!lk); + if (isParentSessionId(_sessionId())) { + // Only observe parent sessions because retryable transactions begin the same txnNumber on + // their parent session. + OperationContextSession::observeNewTxnNumberStarted( + opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber()); + } } void RetryableWriteTransactionParticipantCatalog::addParticipant( diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 37cca9a0099..736020b0bf1 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/fill_locker_info.h" #include "mongo/db/transaction_participant.h" @@ -350,6 +351,33 @@ protected: return opCtxSession; } + void checkOutSessionFromDiferentOpCtx(const LogicalSessionId& lsid, + bool beginOrContinueTxn, + boost::optional txnNumber = boost::none, + boost::optional autocommit = boost::none, + boost::optional startTransaction = boost::none, + bool commitTxn = false) { + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(lsid); + if (txnNumber) { + opCtx->setTxnNumber(*txnNumber); + opCtx->setInMultiDocumentTransaction(); + } + + auto opCtxSession = std::make_unique(opCtx); + + if (beginOrContinueTxn) { + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, autocommit, startTransaction); + + if (commitTxn) { + txnParticipant.commitUnpreparedTransaction(opCtx); + } + } + }); + } + const LogicalSessionId _sessionId{makeLogicalSessionIdForTest()}; const TxnNumber _txnNumber{20}; const UUID _uuid = UUID::gen(); @@ -1662,6 +1690,30 @@ protected: TxnParticipantTest::tearDown(); } + void runRetryableWrite(LogicalSessionId lsid, TxnNumber txnNumber) { + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNumber); + auto opCtxSession = std::make_unique(opCtx); + TransactionParticipant::get(opCtx).beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, boost::none, boost::none); + }); + } + + void runAndCommitTransaction(LogicalSessionId lsid, TxnNumber txnNumber) { + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + auto opCtxSession = std::make_unique(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue(opCtx, {*opCtx->getTxnNumber()}, false, true); + txnParticipant.unstashTransactionResources(opCtx, "find"); + txnParticipant.commitUnpreparedTransaction(opCtx); + }); + } + RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true}; }; @@ -5432,5 +5484,397 @@ TEST_F(TxnParticipantTest, } } +bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) { + bool existsInCatalog{false}; + sessionCatalog->scanSession(lsid, + [&](const ObservableSession& session) { existsInCatalog = true; }); + return existsInCatalog; +} + +TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewClientTransaction) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber client transaction and verify the child was erased. + + parentTxnNumber++; + runAndCommitTransaction(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); +} + +TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewClientRetryableWrite) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber retryable write and verify the child was erased. + + parentTxnNumber++; + runRetryableWrite(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); +} + +TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewRetryableTransaction) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber retryable transaction and verify the child was erased. + + parentTxnNumber++; + auto higherRetryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(higherRetryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog)); +} + +TEST_F( + ShardTxnParticipantTest, + EagerlyReapRetryableSessionsOnlyUponNewTransactionBegunAndIgnoresNonRetryableAndUnrelatedSessions) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with two retryable children and one non-retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid1 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid1, 0); + + auto retryableChildLsid2 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid2, 0); + + auto nonRetryableChildLsid = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid); + runAndCommitTransaction(nonRetryableChildLsid, 0); + + // Add entries for unrelated sessions to verify they aren't affected. + + auto parentLsidOther = makeLogicalSessionIdForTest(); + runRetryableWrite(parentLsidOther, 0); + + auto retryableChildLsidOther = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsidOther, parentTxnNumber); + runAndCommitTransaction(retryableChildLsidOther, 0); + + auto nonRetryableChildLsidOther = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsidOther, parentTxnNumber); + runAndCommitTransaction(nonRetryableChildLsidOther, 0); + + ASSERT_EQ(sessionCatalog->size(), 2); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog)); + ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog)); + + // Check out with a higher txnNumber and verify we don't reap until a transaction has begun on + // it. + + parentTxnNumber++; + + // Does not call beginOrContinue. + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(parentLsid); + opCtx->setTxnNumber(parentTxnNumber); + auto opCtxSession = std::make_unique(opCtx); + }); + // Does not call beginOrContinue. + auto higherRetryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(higherRetryableChildLsid); + opCtx->setTxnNumber(0); + opCtx->setInMultiDocumentTransaction(); + auto opCtxSession = std::make_unique(opCtx); + }); + // beginOrContinue fails because no startTransaction=true. + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(parentLsid); + opCtx->setTxnNumber(parentTxnNumber); + opCtx->setInMultiDocumentTransaction(); + auto opCtxSession = std::make_unique(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + ASSERT_THROWS_CODE( + txnParticipant.beginOrContinue(opCtx, {*opCtx->getTxnNumber()}, false, boost::none), + AssertionException, + ErrorCodes::NoSuchTransaction); + }); + // Non-retryable child sessions shouldn't affect retryable sessions. + auto newNonRetryableChildLsid = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid); + runAndCommitTransaction(newNonRetryableChildLsid, 0); + + // No sessions should have been reaped. + ASSERT_EQ(sessionCatalog->size(), 2); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog)); + ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog)); + + // Call beginOrContinue for a higher txnNumber and verify we do erase old sessions for the + // active session. + + runRetryableWrite(parentLsid, parentTxnNumber); + + // The two retryable children for parentLsid should have been reaped. + ASSERT_EQ(sessionCatalog->size(), 2); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog)); + ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog)); +} + +TEST_F(ShardTxnParticipantTest, EagerlyReapLowerTxnNumbers) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with two retryable children. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 1; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid1 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid1, 0); + + auto retryableChildLsid2 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid2, 0); + + auto lowerRetryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber - 1); + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(lowerRetryableChildLsid); + opCtx->setTxnNumber(0); + opCtx->setInMultiDocumentTransaction(); + auto opCtxSession = std::make_unique(opCtx); + }); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + ASSERT(doesExistInCatalog(lowerRetryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber retryable transaction and verify the child was erased. + + parentTxnNumber++; + runRetryableWrite(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(lowerRetryableChildLsid, sessionCatalog)); +} + +TEST_F(ShardTxnParticipantTest, EagerlyReapSkipsHigherUnusedTxnNumbers) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 1; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Check out a higher txnNumber retryable transaction but do not start it and verify it does not + // reap and is not reaped. + + auto higherUnusedRetryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber + 10); + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(higherUnusedRetryableChildLsid); + opCtx->setTxnNumber(0); + opCtx->setInMultiDocumentTransaction(); + auto opCtxSession = std::make_unique(opCtx); + }); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(higherUnusedRetryableChildLsid, sessionCatalog)); + + parentTxnNumber++; // Still less than in higherUnusedRetryableChildLsid. + runRetryableWrite(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(higherUnusedRetryableChildLsid, sessionCatalog)); +} + +TEST_F(ShardTxnParticipantTest, EagerlyReapSkipsKilledSessions) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with two retryable children. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 1; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid1 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid1, 0); + + auto retryableChildLsid2 = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid2, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + + // Kill one retryable child and verify no sessions in its SRI can be reaped until it has been + // checked out by its killer. + + parentTxnNumber++; + boost::optional killToken; + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(parentLsid); + opCtx->setTxnNumber(parentTxnNumber); + auto opCtxSession = std::make_unique(opCtx); + + TransactionParticipant::get(opCtx).beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, boost::none, boost::none); + + // Kill after checking out the session because we can't check out the session again + // after a kill without checking out with the killToken first. + killToken = sessionCatalog->killSession(retryableChildLsid1); + }); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + + // Check out for kill the killed retryable session and now both retryable sessions can be + // reaped. + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + sessionCatalog->checkOutSessionForKill(opCtx, std::move(*killToken)); + }); + + // A new client txnNumber must be seen to trigger the reaping, so the sessions shouldn't have + // been reaped upon releasing the killed session. + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); + + parentTxnNumber++; + runRetryableWrite(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog)); +} + +TEST_F(ShardTxnParticipantTest, CheckingOutEagerlyReapedSessionDoesNotCrash) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runRetryableWrite(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runAndCommitTransaction(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber client transaction and verify the child was erased. + + parentTxnNumber++; + runAndCommitTransaction(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Check out the child again to verify this doesn't crash. + ASSERT_THROWS_CODE(runAndCommitTransaction(retryableChildLsid, 1), + AssertionException, + ErrorCodes::TransactionTooOld); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 955fb56861e..1368e3dc342 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -1436,34 +1436,39 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con void TransactionRouter::Router::_resetRouterState( OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { - stdx::lock_guard lk(*opCtx->getClient()); + { + stdx::lock_guard lk(*opCtx->getClient()); - uassert(ErrorCodes::ConflictingOperationInProgress, - "Cannot start a new transaction while the previous is yielded", - o(lk).activeYields == 0); - - o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber()); - o(lk).txnNumberAndRetryCounter.setTxnRetryCounter( - *txnNumberAndRetryCounter.getTxnRetryCounter()); - o(lk).commitType = CommitType::kNotInitiated; - p().isRecoveringCommit = false; - o(lk).participants.clear(); - o(lk).coordinatorId.reset(); - p().recoveryShardId.reset(); - o(lk).apiParameters = {}; - o(lk).readConcernArgs = {}; - o(lk).atClusterTime.reset(); - o(lk).abortCause = std::string(); - o(lk).metricsTracker.emplace(opCtx->getServiceContext()); - p().terminationInitiated = false; + uassert(ErrorCodes::ConflictingOperationInProgress, + "Cannot start a new transaction while the previous is yielded", + o(lk).activeYields == 0); - auto tickSource = opCtx->getServiceContext()->getTickSource(); - o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks()); + o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber()); + o(lk).txnNumberAndRetryCounter.setTxnRetryCounter( + *txnNumberAndRetryCounter.getTxnRetryCounter()); + o(lk).commitType = CommitType::kNotInitiated; + p().isRecoveringCommit = false; + o(lk).participants.clear(); + o(lk).coordinatorId.reset(); + p().recoveryShardId.reset(); + o(lk).apiParameters = {}; + o(lk).readConcernArgs = {}; + o(lk).atClusterTime.reset(); + o(lk).abortCause = std::string(); + o(lk).metricsTracker.emplace(opCtx->getServiceContext()); + p().terminationInitiated = false; + + auto tickSource = opCtx->getServiceContext()->getTickSource(); + o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks()); + + // TODO SERVER-37115: Parse statement ids from the client and remember the statement id + // of the command that started the transaction, if one was included. + p().latestStmtId = kDefaultFirstStmtId; + p().firstStmtId = kDefaultFirstStmtId; + } - // TODO SERVER-37115: Parse statement ids from the client and remember the statement id - // of the command that started the transaction, if one was included. - p().latestStmtId = kDefaultFirstStmtId; - p().firstStmtId = kDefaultFirstStmtId; + OperationContextSession::observeNewTxnNumberStarted( + opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber()); }; void TransactionRouter::Router::_resetRouterStateForStartTransaction( diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index addefae213c..ef933e911c1 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -200,6 +200,26 @@ protected: }); } + void runFunctionFromDifferentOpCtx(std::function func) { + auto newClientOwned = getServiceContext()->makeClient("newClient"); + AlternativeClientRegion acr(newClientOwned); + auto newOpCtx = cc().makeOperationContext(); + func(newOpCtx.get()); + } + + void runTransactionLeaveOpen(LogicalSessionId lsid, TxnNumber txnNumber) { + runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) { + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + auto opCtxSession = std::make_unique(opCtx); + + auto txnRouter = TransactionRouter::get(opCtx); + txnRouter.beginOrContinueTxn( + opCtx, *opCtx->getTxnNumber(), TransactionRouter::TransactionActions::kStart); + }); + } + private: // Enables the transaction router to retry within a transaction on stale version and snapshot // errors for the duration of each test. @@ -5302,5 +5322,72 @@ TEST_F(TransactionRouterMetricsTest, IsTrackingOverIfTxnImplicitlyAborted) { implicitAbortInProgress(); ASSERT(txnRouter().isTrackingOver()); } + +bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) { + bool existsInCatalog{false}; + sessionCatalog->scanSession(lsid, + [&](const ObservableSession& session) { existsInCatalog = true; }); + return existsInCatalog; +} + +TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewClientTransaction) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runTransactionLeaveOpen(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runTransactionLeaveOpen(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber client transaction and verify the child was erased. + + parentTxnNumber++; + runTransactionLeaveOpen(parentLsid, parentTxnNumber); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); +} + +TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewRetryableTransaction) { + auto sessionCatalog = SessionCatalog::get(getServiceContext()); + ASSERT_EQ(sessionCatalog->size(), 0); + + // Add a parent session with one retryable child. + + auto parentLsid = makeLogicalSessionIdForTest(); + auto parentTxnNumber = 0; + runTransactionLeaveOpen(parentLsid, parentTxnNumber); + + auto retryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runTransactionLeaveOpen(retryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + + // Start a higher txnNumber retryable transaction and verify the child was erased. + + parentTxnNumber++; + auto higherRetryableChildLsid = + makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber); + runTransactionLeaveOpen(higherRetryableChildLsid, 0); + + ASSERT_EQ(sessionCatalog->size(), 1); + ASSERT(doesExistInCatalog(parentLsid, sessionCatalog)); + ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog)); + ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog)); +} + } // unnamed namespace } // namespace mongo -- cgit v1.2.1