diff options
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 104 |
1 files changed, 47 insertions, 57 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 46d2a49a6c0..a258ed1833d 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -419,8 +419,7 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper TxnNumber txnNumber) { uassert(ErrorCodes::NoSuchTransaction, str::stream() - << "Given transaction number " - << txnNumber + << "Given transaction number " << txnNumber << " does not match any in-progress transactions. The active transaction number is " << o().activeTxnNumber, txnNumber == o().activeTxnNumber && !o().txnState.isInRetryableWriteMode()); @@ -442,8 +441,7 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper uasserted( ErrorCodes::NoSuchTransaction, str::stream() - << "Transaction " - << txnNumber + << "Transaction " << txnNumber << " has been aborted because an earlier command in this transaction failed."); } return; @@ -503,9 +501,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt uassert(ErrorCodes::TransactionTooOld, str::stream() << "Cannot start transaction " << txnNumber << " on session " - << _sessionId() - << " because a newer transaction " - << o().activeTxnNumber + << _sessionId() << " because a newer transaction " << o().activeTxnNumber << " has already started.", txnNumber >= o().activeTxnNumber); @@ -552,8 +548,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt TransactionState::kNone | TransactionState::kAbortedWithoutPrepare; uassert(50911, str::stream() << "Cannot start a transaction at given transaction number " - << txnNumber - << " a transaction with the same number is in state " + << txnNumber << " a transaction with the same number is in state " << o().txnState, o().txnState.isInSet(restartableStates)); } @@ -1087,8 +1082,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( uassert(ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "prepareTransaction failed because one of the transaction " "operations was done against a temporary collection '" - << collection->ns() - << "'.", + << collection->ns() << "'.", !collection->isTemporary(opCtx)); } @@ -1394,8 +1388,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction( str::stream() << "Commit oplog entry must be greater than or equal to commit " "timestamp due to causal consistency. commit timestamp: " << commitTimestamp.toBSON() - << ", commit oplog entry optime: " - << commitOplogSlot.toBSON()); + << ", commit oplog entry optime: " << commitOplogSlot.toBSON()); } else { // We always expect a non-null commitOplogEntryOpTime to be passed in on secondaries // in order to set the finishOpTime. @@ -1847,8 +1840,7 @@ void TransactionParticipant::TransactionState::transitionTo(StateFlag newState, if (shouldValidate == TransitionValidation::kValidateTransition) { invariant(TransactionState::_isLegalTransition(_state, newState), str::stream() << "Current state: " << toString(_state) - << ", Illegal attempted next state: " - << toString(newState)); + << ", Illegal attempted next state: " << toString(newState)); } // If we are transitioning out of prepare, signal waiters by fulfilling the completion promise. @@ -2186,9 +2178,7 @@ boost::optional<repl::OpTime> TransactionParticipant::Participant::_checkStateme if (it == p().activeTxnCommittedStatements.end()) { uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "Incomplete history detected for transaction " - << o().activeTxnNumber - << " on session " - << _sessionId(), + << o().activeTxnNumber << " on session " << _sessionId(), !p().hasIncompleteHistory); return boost::none; @@ -2212,45 +2202,45 @@ void TransactionParticipant::Participant::_registerUpdateCacheOnCommit( OperationContext* opCtx, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { - opCtx->recoveryUnit()->onCommit( - [ opCtx, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( - boost::optional<Timestamp>) { - TransactionParticipant::Participant participant(opCtx); - invariant(participant.p().isValid); - - RetryableWritesStats::get(opCtx->getServiceContext()) - ->incrementTransactionsCollectionWriteCount(); - - stdx::lock_guard<Client> lg(*opCtx->getClient()); - - // The cache of the last written record must always be advanced after a write so that - // subsequent writes have the correct point to start from. - participant.o(lg).lastWriteOpTime = lastStmtIdWriteOpTime; - - for (const auto stmtId : stmtIdsWritten) { - if (stmtId == kIncompleteHistoryStmtId) { - participant.p().hasIncompleteHistory = true; - continue; - } - - const auto insertRes = participant.p().activeTxnCommittedStatements.emplace( - stmtId, lastStmtIdWriteOpTime); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(participant._sessionId(), - participant.o().activeTxnNumber, - stmtId, - existingOpTime, - lastStmtIdWriteOpTime); - } + opCtx->recoveryUnit()->onCommit([opCtx, + stmtIdsWritten = std::move(stmtIdsWritten), + lastStmtIdWriteOpTime](boost::optional<Timestamp>) { + TransactionParticipant::Participant participant(opCtx); + invariant(participant.p().isValid); + + RetryableWritesStats::get(opCtx->getServiceContext()) + ->incrementTransactionsCollectionWriteCount(); + + stdx::lock_guard<Client> lg(*opCtx->getClient()); + + // The cache of the last written record must always be advanced after a write so that + // subsequent writes have the correct point to start from. + participant.o(lg).lastWriteOpTime = lastStmtIdWriteOpTime; + + for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + participant.p().hasIncompleteHistory = true; + continue; } - // If this is the first time executing a retryable write, we should indicate that to - // the transaction participant. - if (participant.o(lg).txnState.isNone()) { - participant.o(lg).txnState.transitionTo(TransactionState::kExecutedRetryableWrite); + const auto insertRes = + participant.p().activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(participant._sessionId(), + participant.o().activeTxnNumber, + stmtId, + existingOpTime, + lastStmtIdWriteOpTime); } - }); + } + + // If this is the first time executing a retryable write, we should indicate that to + // the transaction participant. + if (participant.o(lg).txnState.isNone()) { + participant.o(lg).txnState.transitionTo(TransactionState::kExecutedRetryableWrite); + } + }); MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { const auto& data = customArgs.getData(); @@ -2264,9 +2254,9 @@ void TransactionParticipant::Participant::_registerUpdateCacheOnCommit( if (!failBeforeCommitExceptionElem.eoo()) { const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); uasserted(failureCode, - str::stream() << "Failing write for " << _sessionId() << ":" - << o().activeTxnNumber - << " due to failpoint. The write must not be reflected."); + str::stream() + << "Failing write for " << _sessionId() << ":" << o().activeTxnNumber + << " due to failpoint. The write must not be reflected."); } } } |