summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_participant.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r--src/mongo/db/transaction_participant.cpp104
1 files changed, 47 insertions, 57 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 46d2a49a6c0..a258ed1833d 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -419,8 +419,7 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper
TxnNumber txnNumber) {
uassert(ErrorCodes::NoSuchTransaction,
str::stream()
- << "Given transaction number "
- << txnNumber
+ << "Given transaction number " << txnNumber
<< " does not match any in-progress transactions. The active transaction number is "
<< o().activeTxnNumber,
txnNumber == o().activeTxnNumber && !o().txnState.isInRetryableWriteMode());
@@ -442,8 +441,7 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper
uasserted(
ErrorCodes::NoSuchTransaction,
str::stream()
- << "Transaction "
- << txnNumber
+ << "Transaction " << txnNumber
<< " has been aborted because an earlier command in this transaction failed.");
}
return;
@@ -503,9 +501,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
uassert(ErrorCodes::TransactionTooOld,
str::stream() << "Cannot start transaction " << txnNumber << " on session "
- << _sessionId()
- << " because a newer transaction "
- << o().activeTxnNumber
+ << _sessionId() << " because a newer transaction " << o().activeTxnNumber
<< " has already started.",
txnNumber >= o().activeTxnNumber);
@@ -552,8 +548,7 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
TransactionState::kNone | TransactionState::kAbortedWithoutPrepare;
uassert(50911,
str::stream() << "Cannot start a transaction at given transaction number "
- << txnNumber
- << " a transaction with the same number is in state "
+ << txnNumber << " a transaction with the same number is in state "
<< o().txnState,
o().txnState.isInSet(restartableStates));
}
@@ -1087,8 +1082,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "prepareTransaction failed because one of the transaction "
"operations was done against a temporary collection '"
- << collection->ns()
- << "'.",
+ << collection->ns() << "'.",
!collection->isTemporary(opCtx));
}
@@ -1394,8 +1388,7 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
str::stream() << "Commit oplog entry must be greater than or equal to commit "
"timestamp due to causal consistency. commit timestamp: "
<< commitTimestamp.toBSON()
- << ", commit oplog entry optime: "
- << commitOplogSlot.toBSON());
+ << ", commit oplog entry optime: " << commitOplogSlot.toBSON());
} else {
// We always expect a non-null commitOplogEntryOpTime to be passed in on secondaries
// in order to set the finishOpTime.
@@ -1847,8 +1840,7 @@ void TransactionParticipant::TransactionState::transitionTo(StateFlag newState,
if (shouldValidate == TransitionValidation::kValidateTransition) {
invariant(TransactionState::_isLegalTransition(_state, newState),
str::stream() << "Current state: " << toString(_state)
- << ", Illegal attempted next state: "
- << toString(newState));
+ << ", Illegal attempted next state: " << toString(newState));
}
// If we are transitioning out of prepare, signal waiters by fulfilling the completion promise.
@@ -2186,9 +2178,7 @@ boost::optional<repl::OpTime> TransactionParticipant::Participant::_checkStateme
if (it == p().activeTxnCommittedStatements.end()) {
uassert(ErrorCodes::IncompleteTransactionHistory,
str::stream() << "Incomplete history detected for transaction "
- << o().activeTxnNumber
- << " on session "
- << _sessionId(),
+ << o().activeTxnNumber << " on session " << _sessionId(),
!p().hasIncompleteHistory);
return boost::none;
@@ -2212,45 +2202,45 @@ void TransactionParticipant::Participant::_registerUpdateCacheOnCommit(
OperationContext* opCtx,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
- opCtx->recoveryUnit()->onCommit(
- [ opCtx, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
- boost::optional<Timestamp>) {
- TransactionParticipant::Participant participant(opCtx);
- invariant(participant.p().isValid);
-
- RetryableWritesStats::get(opCtx->getServiceContext())
- ->incrementTransactionsCollectionWriteCount();
-
- stdx::lock_guard<Client> lg(*opCtx->getClient());
-
- // The cache of the last written record must always be advanced after a write so that
- // subsequent writes have the correct point to start from.
- participant.o(lg).lastWriteOpTime = lastStmtIdWriteOpTime;
-
- for (const auto stmtId : stmtIdsWritten) {
- if (stmtId == kIncompleteHistoryStmtId) {
- participant.p().hasIncompleteHistory = true;
- continue;
- }
-
- const auto insertRes = participant.p().activeTxnCommittedStatements.emplace(
- stmtId, lastStmtIdWriteOpTime);
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(participant._sessionId(),
- participant.o().activeTxnNumber,
- stmtId,
- existingOpTime,
- lastStmtIdWriteOpTime);
- }
+ opCtx->recoveryUnit()->onCommit([opCtx,
+ stmtIdsWritten = std::move(stmtIdsWritten),
+ lastStmtIdWriteOpTime](boost::optional<Timestamp>) {
+ TransactionParticipant::Participant participant(opCtx);
+ invariant(participant.p().isValid);
+
+ RetryableWritesStats::get(opCtx->getServiceContext())
+ ->incrementTransactionsCollectionWriteCount();
+
+ stdx::lock_guard<Client> lg(*opCtx->getClient());
+
+ // The cache of the last written record must always be advanced after a write so that
+ // subsequent writes have the correct point to start from.
+ participant.o(lg).lastWriteOpTime = lastStmtIdWriteOpTime;
+
+ for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ participant.p().hasIncompleteHistory = true;
+ continue;
}
- // If this is the first time executing a retryable write, we should indicate that to
- // the transaction participant.
- if (participant.o(lg).txnState.isNone()) {
- participant.o(lg).txnState.transitionTo(TransactionState::kExecutedRetryableWrite);
+ const auto insertRes =
+ participant.p().activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(participant._sessionId(),
+ participant.o().activeTxnNumber,
+ stmtId,
+ existingOpTime,
+ lastStmtIdWriteOpTime);
}
- });
+ }
+
+ // If this is the first time executing a retryable write, we should indicate that to
+ // the transaction participant.
+ if (participant.o(lg).txnState.isNone()) {
+ participant.o(lg).txnState.transitionTo(TransactionState::kExecutedRetryableWrite);
+ }
+ });
MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
const auto& data = customArgs.getData();
@@ -2264,9 +2254,9 @@ void TransactionParticipant::Participant::_registerUpdateCacheOnCommit(
if (!failBeforeCommitExceptionElem.eoo()) {
const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
uasserted(failureCode,
- str::stream() << "Failing write for " << _sessionId() << ":"
- << o().activeTxnNumber
- << " due to failpoint. The write must not be reflected.");
+ str::stream()
+ << "Failing write for " << _sessionId() << ":" << o().activeTxnNumber
+ << " due to failpoint. The write must not be reflected.");
}
}
}