summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenbin Zhu <wenbin.zhu@mongodb.com>2023-05-03 20:19:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-03 22:04:15 +0000
commit9cf95efcbd37df83bcd9af4593e79c51ea4062ea (patch)
tree4b5dca80f9ca6c1f05444584eb352ef53eb7d57c
parent1470c88daaa805408f083a1c2f7bb22f1632b5d1 (diff)
downloadmongo-9cf95efcbd37df83bcd9af4593e79c51ea4062ea.tar.gz
SERVER-76741 Split prepared transactions should be aborted after reserving abort oplog slot.
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp67
-rw-r--r--src/mongo/db/transaction/transaction_participant.h6
2 files changed, 33 insertions, 40 deletions
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index 39894d4a12e..65c611509f5 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -1933,28 +1933,25 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
invariant(!o().lastWriteOpTime.isNull());
auto commitOplogSlotOpTime = commitOplogEntryOpTime.value_or(commitOplogSlot);
+
+ // If we are a primary committing a transaction that was split into smaller prepared
+ // transactions, cascade the commit.
auto* splitPrepareManager =
repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
if (opCtx->writesAreReplicated() &&
splitPrepareManager->isSessionSplit(
_sessionId(), o().activeTxnNumberAndRetryCounter.getTxnNumber())) {
- // If we are a primary committing a transaction that was split into smaller prepared
- // transactions, use a special commit code path.
- _commitSplitPreparedTxnOnPrimary(opCtx,
- splitPrepareManager,
- _sessionId(),
- o().activeTxnNumberAndRetryCounter.getTxnNumber(),
- commitTimestamp,
- commitOplogSlot.getTimestamp());
- } else {
- // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from the
- // commitOplogSlot which will only be set if we are primary. Otherwise, the
- // commitOplogEntryOpTime must have been passed in during secondary oplog application.
- opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
- opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp());
- _commitStorageTransaction(opCtx);
+ _commitSplitPreparedTxnOnPrimary(
+ opCtx, splitPrepareManager, commitTimestamp, commitOplogSlot.getTimestamp());
}
+ // If commitOplogEntryOpTime is a nullopt, then we grab the OpTime from commitOplogSlot
+ // which will only be set if we are primary. Otherwise, the commitOplogEntryOpTime must
+ // have been passed in during secondary oplog application.
+ opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
+ opCtx->recoveryUnit()->setDurableTimestamp(commitOplogSlotOpTime.getTimestamp());
+ _commitStorageTransaction(opCtx);
+
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
@@ -2002,11 +1999,12 @@ void TransactionParticipant::Participant::_commitStorageTransaction(OperationCon
void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary(
OperationContext* userOpCtx,
repl::SplitPrepareSessionManager* splitPrepareManager,
- const LogicalSessionId& userSessionId,
- const TxnNumber& userTxnNumber,
const Timestamp& commitTimestamp,
const Timestamp& durableTimestamp) {
+ const auto& userSessionId = _sessionId();
+ const auto& userTxnNumber = o().activeTxnNumberAndRetryCounter.getTxnNumber();
+
for (const auto& sessInfos :
splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) {
@@ -2063,9 +2061,6 @@ void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary(
}
splitPrepareManager->releaseSplitSessions(userSessionId, userTxnNumber);
- userOpCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp);
- userOpCtx->recoveryUnit()->setDurableTimestamp(durableTimestamp);
- this->_commitStorageTransaction(userOpCtx);
}
void TransactionParticipant::Participant::_finishCommitTransaction(
@@ -2167,17 +2162,11 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
OperationContext* opCtx, TransactionState::StateSet expectedStates) {
invariant(!o().txnResourceStash);
- // If this is a split-prepared transaction, cascade the abort.
auto* splitPrepareManager =
repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
- if (opCtx->writesAreReplicated() &&
+ bool isSplitPreparedTxn = opCtx->writesAreReplicated() &&
splitPrepareManager->isSessionSplit(_sessionId(),
- o().activeTxnNumberAndRetryCounter.getTxnNumber())) {
- _abortSplitPreparedTxnOnPrimary(opCtx,
- splitPrepareManager,
- _sessionId(),
- o().activeTxnNumberAndRetryCounter.getTxnNumber());
- }
+ o().activeTxnNumberAndRetryCounter.getTxnNumber());
if (!o().txnState.isInRetryableWriteMode()) {
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -2195,6 +2184,12 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
// abort oplog entry.
OplogSlotReserver oplogSlotReserver(opCtx);
+ // If we are a primary aborting a transaction that was split into smaller prepared
+ // transactions, cascade the abort.
+ if (isSplitPreparedTxn) {
+ _abortSplitPreparedTxnOnPrimary(opCtx, splitPrepareManager);
+ }
+
// Clean up the transaction resources on the opCtx even if the transaction resources on the
// session were not aborted. This actually aborts the storage-transaction.
_cleanUpTxnResourceOnOpCtx(opCtx, TerminationCause::kAborted);
@@ -2231,6 +2226,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
// is not cleaning up some internal TransactionParticipant state, updating metrics, or
// logging the end of the transaction. That will either be cleaned up in the
// ServiceEntryPoint's abortGuard or when the next transaction begins.
+ invariant(!isSplitPreparedTxn);
_cleanUpTxnResourceOnOpCtx(opCtx, TerminationCause::kAborted);
opObserver->onTransactionAbort(opCtx, boost::none);
_finishAbortingActiveTransaction(opCtx, expectedStates);
@@ -2238,16 +2234,17 @@ void TransactionParticipant::Participant::_abortActiveTransaction(
}
void TransactionParticipant::Participant::_abortSplitPreparedTxnOnPrimary(
- OperationContext* opCtx,
- repl::SplitPrepareSessionManager* splitPrepareManager,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNumber) {
+ OperationContext* userOpCtx, repl::SplitPrepareSessionManager* splitPrepareManager) {
+
+ const auto& userSessionId = _sessionId();
+ const auto& userTxnNumber = o().activeTxnNumberAndRetryCounter.getTxnNumber();
+
// If there are split prepared sessions, it must be because this transaction was prepared
// via an oplog entry applied as a secondary.
for (const repl::SplitSessionInfo& sessionInfo :
- splitPrepareManager->getSplitSessions(sessionId, txnNumber).get()) {
+ splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) {
- auto splitClientOwned = opCtx->getServiceContext()->makeClient("tempSplitClient");
+ auto splitClientOwned = userOpCtx->getServiceContext()->makeClient("tempSplitClient");
auto splitOpCtx = splitClientOwned->makeOperationContext();
// TODO(SERVER-74656): Please revisit if this thread could be made killable.
@@ -2277,7 +2274,7 @@ void TransactionParticipant::Participant::_abortSplitPreparedTxnOnPrimary(
checkedOutSession->checkIn(splitOpCtx.get(), OperationContextSession::CheckInReason::kDone);
}
- splitPrepareManager->releaseSplitSessions(_sessionId(),
+ splitPrepareManager->releaseSplitSessions(userSessionId,
o().activeTxnNumberAndRetryCounter.getTxnNumber());
}
diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h
index 533bdc4826c..cfa24852c41 100644
--- a/src/mongo/db/transaction/transaction_participant.h
+++ b/src/mongo/db/transaction/transaction_participant.h
@@ -858,8 +858,6 @@ public:
// `durableTimestamp`.
void _commitSplitPreparedTxnOnPrimary(OperationContext* opCtx,
repl::SplitPrepareSessionManager* splitPrepareManager,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNumber,
const Timestamp& commitTimestamp,
const Timestamp& durableTimestamp);
@@ -876,9 +874,7 @@ public:
// split the storage writes into multiple RecoveryUnits. This method will be invoked by a
// primary such that it looks for all recovery units and aborts them.
void _abortSplitPreparedTxnOnPrimary(OperationContext* opCtx,
- repl::SplitPrepareSessionManager* splitPrepareManager,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNumber);
+ repl::SplitPrepareSessionManager* splitPrepareManager);
// Factors out code for clarity from _abortActiveTransaction.
void _finishAbortingActiveTransaction(OperationContext* opCtx,