diff options
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 115 |
1 files changed, 35 insertions, 80 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 578d979b10c..70a99dc4eb9 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -932,86 +932,43 @@ void TransactionRouter::Router::_setAtClusterTime( o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId); } -void TransactionRouter::Router::_beginOrContinueActiveTxnNumber( - OperationContext* opCtx, - TxnNumberAndRetryCounter txnNumberAndRetryCounter, - TransactionActions action) { +void TransactionRouter::Router::_continueTxn(OperationContext* opCtx, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, + TransactionActions action) { invariant(txnNumberAndRetryCounter.getTxnNumber() == o().txnNumberAndRetryCounter.getTxnNumber()); + switch (action) { + case TransactionActions::kStart: { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "txnNumber " << o().txnNumberAndRetryCounter.getTxnNumber() + << " txnRetryCounter " + << o().txnNumberAndRetryCounter.getTxnRetryCounter() + << " for session " << _sessionId() << " already started", + isInternalSessionForRetryableWrite(_sessionId())); + break; + } + case TransactionActions::kContinue: { + uassert(ErrorCodes::InvalidOptions, + "Only the first command in a transaction may specify a readConcern", + repl::ReadConcernArgs::get(opCtx).isEmpty()); - if (txnNumberAndRetryCounter.getTxnRetryCounter() < - o().txnNumberAndRetryCounter.getTxnRetryCounter()) { - uasserted(TxnRetryCounterTooOldInfo(*o().txnNumberAndRetryCounter.getTxnRetryCounter()), - str::stream() << "Cannot " << actionTypeToString(action) << " transaction " - << txnNumberAndRetryCounter.getTxnNumber() << " on session " - << _sessionId() << " using txnRetryCounter " - << txnNumberAndRetryCounter.getTxnRetryCounter() - << " because the transaction has already been restarted using" - << " a higher txnRetryCounter " - << o().txnNumberAndRetryCounter.getTxnRetryCounter()); - } else if (txnNumberAndRetryCounter.getTxnRetryCounter() == - o().txnNumberAndRetryCounter.getTxnRetryCounter()) { - switch (action) { - case TransactionActions::kStart: { - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "txnNumber " << o().txnNumberAndRetryCounter.getTxnNumber() - << " txnRetryCounter " - << o().txnNumberAndRetryCounter.getTxnRetryCounter() - << " for session " << _sessionId() << " already started", - isInternalSessionForRetryableWrite(_sessionId())); - break; - } - case TransactionActions::kContinue: { - uassert(ErrorCodes::InvalidOptions, - "Only the first command in a transaction may specify a readConcern", - repl::ReadConcernArgs::get(opCtx).isEmpty()); - - APIParameters::get(opCtx) = o().apiParameters; - repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs; + APIParameters::get(opCtx) = o().apiParameters; + repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs; - ++p().latestStmtId; - _onContinue(opCtx); - break; - } - case TransactionActions::kCommit: - ++p().latestStmtId; - _onContinue(opCtx); - break; - } - } else { - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Cannot " << actionTypeToString(action) << " transaction " - << txnNumberAndRetryCounter.getTxnNumber() << " on session " - << _sessionId() << " using txnRetryCounter " - << txnNumberAndRetryCounter.getTxnRetryCounter() - << " because it has already started to commit using " - << "a lower txnRetryCounter " - << o().txnNumberAndRetryCounter.getTxnRetryCounter(), - o().commitType == CommitType::kNotInitiated || !o().abortCause.empty()); - - if (action == TransactionActions::kCommit) { - // If the first action seen by the router for this txnRetryCounter is to commit, that - // means that the client is attempting to recover a commit decision. - _resetRouterState(opCtx, txnNumberAndRetryCounter); - p().isRecoveringCommit = true; - return; + ++p().latestStmtId; + _onContinue(opCtx); + break; } - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Cannot " << actionTypeToString(action) << " transaction " - << txnNumberAndRetryCounter.getTxnNumber() << " on session " - << _sessionId() << " using txnRetryCounter " - << txnNumberAndRetryCounter.getTxnRetryCounter() - << " because it is using a lower txnRetryCounter " - << o().txnNumberAndRetryCounter.getTxnRetryCounter(), - action == TransactionActions::kStart); - _resetRouterStateForStartTransaction(opCtx, txnNumberAndRetryCounter); + case TransactionActions::kCommit: + ++p().latestStmtId; + _onContinue(opCtx); + break; } } -void TransactionRouter::Router::_beginNewTxnNumber( - OperationContext* opCtx, - TxnNumberAndRetryCounter txnNumberAndRetryCounter, - TransactionActions action) { +void TransactionRouter::Router::_beginTxn(OperationContext* opCtx, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, + TransactionActions action) { invariant(txnNumberAndRetryCounter.getTxnNumber() > o().txnNumberAndRetryCounter.getTxnNumber()); @@ -1046,12 +1003,10 @@ void TransactionRouter::Router::_beginNewTxnNumber( }; } -void TransactionRouter::Router::beginOrContinueTxn( - OperationContext* opCtx, - TxnNumberAndRetryCounter txnNumberAndRetryCounter, - TransactionActions action) { - invariant(txnNumberAndRetryCounter.getTxnRetryCounter() >= 0, - "Cannot specify a negative txnRetryCounter"); +void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action) { + const TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber, 0}; if (txnNumberAndRetryCounter.getTxnNumber() < o().txnNumberAndRetryCounter.getTxnNumber()) { // This transaction is older than the transaction currently in progress, so throw an error. @@ -1072,10 +1027,10 @@ void TransactionRouter::Router::beginOrContinueTxn( o().apiParameters.toBSON().toString()), apiParamsFromClient == o().apiParameters); } - _beginOrContinueActiveTxnNumber(opCtx, txnNumberAndRetryCounter, action); + _continueTxn(opCtx, txnNumberAndRetryCounter, action); } else { // This is a newer transaction. - _beginNewTxnNumber(opCtx, txnNumberAndRetryCounter, action); + _beginTxn(opCtx, txnNumberAndRetryCounter, action); } _updateLastClientInfo(opCtx->getClient()); |