diff options
author | Wenbin Zhu <wenbin.zhu@mongodb.com> | 2023-05-03 20:19:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-03 22:04:15 +0000 |
commit | 9cf95efcbd37df83bcd9af4593e79c51ea4062ea (patch) | |
tree | 4b5dca80f9ca6c1f05444584eb352ef53eb7d57c | |
parent | 1470c88daaa805408f083a1c2f7bb22f1632b5d1 (diff) | |
download | mongo-9cf95efcbd37df83bcd9af4593e79c51ea4062ea.tar.gz |
SERVER-76741 Split prepared transactions should be aborted after reserving abort oplog slot.
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.cpp | 67 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.h | 6 |
2 files changed, 33 insertions, 40 deletions
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index 39894d4a12e..65c611509f5 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -1933,28 +1933,25 @@ void TransactionParticipant::Participant::commitPreparedTransaction( invariant(!o().lastWriteOpTime.isNull()); auto commitOplogSlotOpTime = commitOplogEntryOpTime.value_or(commitOplogSlot); + + // If we are a primary committing a transaction that was split into smaller prepared + // transactions, cascade the commit. auto* splitPrepareManager = repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); if (opCtx->writesAreReplicated() && splitPrepareManager->isSessionSplit( _sessionId(), o().activeTxnNumberAndRetryCounter.getTxnNumber())) { - // If we are a primary committing a transaction that was split into smaller prepared - // transactions, use a special commit code path. - _commitSplitPreparedTxnOnPrimary(opCtx, - splitPrepareManager, - _sessionId(), - o().activeTxnNumberAndRetryCounter.getTxnNumber(), - commitTimestamp, - commitOplogSlot.getTimestamp()); - } else { - // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from the - // commitOplogSlot which will only be set if we are primary. Otherwise, the - // commitOplogEntryOpTime must have been passed in during secondary oplog application. - opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); - opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp()); - _commitStorageTransaction(opCtx); + _commitSplitPreparedTxnOnPrimary( + opCtx, splitPrepareManager, commitTimestamp, commitOplogSlot.getTimestamp()); } + // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from commitOplogSlot + // which will only be set if we are primary. Otherwise, the commitOplogEntryOpTime must + // have been passed in during secondary oplog application. + opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); + opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp()); + _commitStorageTransaction(opCtx); + auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); @@ -2002,11 +1999,12 @@ void TransactionParticipant::Participant::_commitStorageTransaction(OperationCon void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary( OperationContext* userOpCtx, repl::SplitPrepareSessionManager* splitPrepareManager, - const LogicalSessionId& userSessionId, - const TxnNumber& userTxnNumber, const Timestamp& commitTimestamp, const Timestamp& durableTimestamp) { + const auto& userSessionId = _sessionId(); + const auto& userTxnNumber = o().activeTxnNumberAndRetryCounter.getTxnNumber(); + for (const auto& sessInfos : splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) { @@ -2063,9 +2061,6 @@ void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary( } splitPrepareManager->releaseSplitSessions(userSessionId, userTxnNumber); - userOpCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); - userOpCtx->recoveryUnit()->setDurableTimestamp(durableTimestamp); - this->_commitStorageTransaction(userOpCtx); } void TransactionParticipant::Participant::_finishCommitTransaction( @@ -2167,17 +2162,11 @@ void TransactionParticipant::Participant::_abortActiveTransaction( OperationContext* opCtx, TransactionState::StateSet expectedStates) { invariant(!o().txnResourceStash); - // If this is a split-prepared transaction, cascade the abort. auto* splitPrepareManager = repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); - if (opCtx->writesAreReplicated() && + bool isSplitPreparedTxn = opCtx->writesAreReplicated() && splitPrepareManager->isSessionSplit(_sessionId(), - o().activeTxnNumberAndRetryCounter.getTxnNumber())) { - _abortSplitPreparedTxnOnPrimary(opCtx, - splitPrepareManager, - _sessionId(), - o().activeTxnNumberAndRetryCounter.getTxnNumber()); - } + o().activeTxnNumberAndRetryCounter.getTxnNumber()); if (!o().txnState.isInRetryableWriteMode()) { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -2195,6 +2184,12 @@ void TransactionParticipant::Participant::_abortActiveTransaction( // abort oplog entry. OplogSlotReserver oplogSlotReserver(opCtx); + // If we are a primary aborting a transaction that was split into smaller prepared + // transactions, cascade the abort. + if (isSplitPreparedTxn) { + _abortSplitPreparedTxnOnPrimary(opCtx, splitPrepareManager); + } + // Clean up the transaction resources on the opCtx even if the transaction resources on the // session were not aborted. This actually aborts the storage-transaction. _cleanUpTxnResourceOnOpCtx(opCtx, TerminationCause::kAborted); @@ -2231,6 +2226,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction( // is not cleaning up some internal TransactionParticipant state, updating metrics, or // logging the end of the transaction. That will either be cleaned up in the // ServiceEntryPoint's abortGuard or when the next transaction begins. + invariant(!isSplitPreparedTxn); _cleanUpTxnResourceOnOpCtx(opCtx, TerminationCause::kAborted); opObserver->onTransactionAbort(opCtx, boost::none); _finishAbortingActiveTransaction(opCtx, expectedStates); @@ -2238,16 +2234,17 @@ void TransactionParticipant::Participant::_abortActiveTransaction( } void TransactionParticipant::Participant::_abortSplitPreparedTxnOnPrimary( - OperationContext* opCtx, - repl::SplitPrepareSessionManager* splitPrepareManager, - const LogicalSessionId& sessionId, - const TxnNumber& txnNumber) { + OperationContext* userOpCtx, repl::SplitPrepareSessionManager* splitPrepareManager) { + + const auto& userSessionId = _sessionId(); + const auto& userTxnNumber = o().activeTxnNumberAndRetryCounter.getTxnNumber(); + // If there are split prepared sessions, it must be because this transaction was prepared // via an oplog entry applied as a secondary. for (const repl::SplitSessionInfo& sessionInfo : - splitPrepareManager->getSplitSessions(sessionId, txnNumber).get()) { + splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) { - auto splitClientOwned = opCtx->getServiceContext()->makeClient("tempSplitClient"); + auto splitClientOwned = userOpCtx->getServiceContext()->makeClient("tempSplitClient"); auto splitOpCtx = splitClientOwned->makeOperationContext(); // TODO(SERVER-74656): Please revisit if this thread could be made killable. @@ -2277,7 +2274,7 @@ void TransactionParticipant::Participant::_abortSplitPreparedTxnOnPrimary( checkedOutSession->checkIn(splitOpCtx.get(), OperationContextSession::CheckInReason::kDone); } - splitPrepareManager->releaseSplitSessions(_sessionId(), + splitPrepareManager->releaseSplitSessions(userSessionId, o().activeTxnNumberAndRetryCounter.getTxnNumber()); } diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h index 533bdc4826c..cfa24852c41 100644 --- a/src/mongo/db/transaction/transaction_participant.h +++ b/src/mongo/db/transaction/transaction_participant.h @@ -858,8 +858,6 @@ public: // `durableTimestamp`. void _commitSplitPreparedTxnOnPrimary(OperationContext* opCtx, repl::SplitPrepareSessionManager* splitPrepareManager, - const LogicalSessionId& sessionId, - const TxnNumber& txnNumber, const Timestamp& commitTimestamp, const Timestamp& durableTimestamp); @@ -876,9 +874,7 @@ public: // split the storage writes into multiple RecoveryUnits. This method will be invoked by a // primary such that it looks for all recovery units and aborts them. void _abortSplitPreparedTxnOnPrimary(OperationContext* opCtx, - repl::SplitPrepareSessionManager* splitPrepareManager, - const LogicalSessionId& sessionId, - const TxnNumber& txnNumber); + repl::SplitPrepareSessionManager* splitPrepareManager); // Factors out code for clarity from _abortActiveTransaction. void _finishAbortingActiveTransaction(OperationContext* opCtx, |