summaryrefslogtreecommitdiff
path: root/src/mongo/s/transaction_router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r--src/mongo/s/transaction_router.cpp115
1 files changed, 35 insertions, 80 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 578d979b10c..70a99dc4eb9 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -932,86 +932,43 @@ void TransactionRouter::Router::_setAtClusterTime(
o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId);
}
-void TransactionRouter::Router::_beginOrContinueActiveTxnNumber(
- OperationContext* opCtx,
- TxnNumberAndRetryCounter txnNumberAndRetryCounter,
- TransactionActions action) {
+void TransactionRouter::Router::_continueTxn(OperationContext* opCtx,
+ TxnNumberAndRetryCounter txnNumberAndRetryCounter,
+ TransactionActions action) {
invariant(txnNumberAndRetryCounter.getTxnNumber() ==
o().txnNumberAndRetryCounter.getTxnNumber());
+ switch (action) {
+ case TransactionActions::kStart: {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "txnNumber " << o().txnNumberAndRetryCounter.getTxnNumber()
+ << " txnRetryCounter "
+ << o().txnNumberAndRetryCounter.getTxnRetryCounter()
+ << " for session " << _sessionId() << " already started",
+ isInternalSessionForRetryableWrite(_sessionId()));
+ break;
+ }
+ case TransactionActions::kContinue: {
+ uassert(ErrorCodes::InvalidOptions,
+ "Only the first command in a transaction may specify a readConcern",
+ repl::ReadConcernArgs::get(opCtx).isEmpty());
- if (txnNumberAndRetryCounter.getTxnRetryCounter() <
- o().txnNumberAndRetryCounter.getTxnRetryCounter()) {
- uasserted(TxnRetryCounterTooOldInfo(*o().txnNumberAndRetryCounter.getTxnRetryCounter()),
- str::stream() << "Cannot " << actionTypeToString(action) << " transaction "
- << txnNumberAndRetryCounter.getTxnNumber() << " on session "
- << _sessionId() << " using txnRetryCounter "
- << txnNumberAndRetryCounter.getTxnRetryCounter()
- << " because the transaction has already been restarted using"
- << " a higher txnRetryCounter "
- << o().txnNumberAndRetryCounter.getTxnRetryCounter());
- } else if (txnNumberAndRetryCounter.getTxnRetryCounter() ==
- o().txnNumberAndRetryCounter.getTxnRetryCounter()) {
- switch (action) {
- case TransactionActions::kStart: {
- uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "txnNumber " << o().txnNumberAndRetryCounter.getTxnNumber()
- << " txnRetryCounter "
- << o().txnNumberAndRetryCounter.getTxnRetryCounter()
- << " for session " << _sessionId() << " already started",
- isInternalSessionForRetryableWrite(_sessionId()));
- break;
- }
- case TransactionActions::kContinue: {
- uassert(ErrorCodes::InvalidOptions,
- "Only the first command in a transaction may specify a readConcern",
- repl::ReadConcernArgs::get(opCtx).isEmpty());
-
- APIParameters::get(opCtx) = o().apiParameters;
- repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs;
+ APIParameters::get(opCtx) = o().apiParameters;
+ repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs;
- ++p().latestStmtId;
- _onContinue(opCtx);
- break;
- }
- case TransactionActions::kCommit:
- ++p().latestStmtId;
- _onContinue(opCtx);
- break;
- }
- } else {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Cannot " << actionTypeToString(action) << " transaction "
- << txnNumberAndRetryCounter.getTxnNumber() << " on session "
- << _sessionId() << " using txnRetryCounter "
- << txnNumberAndRetryCounter.getTxnRetryCounter()
- << " because it has already started to commit using "
- << "a lower txnRetryCounter "
- << o().txnNumberAndRetryCounter.getTxnRetryCounter(),
- o().commitType == CommitType::kNotInitiated || !o().abortCause.empty());
-
- if (action == TransactionActions::kCommit) {
- // If the first action seen by the router for this txnRetryCounter is to commit, that
- // means that the client is attempting to recover a commit decision.
- _resetRouterState(opCtx, txnNumberAndRetryCounter);
- p().isRecoveringCommit = true;
- return;
+ ++p().latestStmtId;
+ _onContinue(opCtx);
+ break;
}
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Cannot " << actionTypeToString(action) << " transaction "
- << txnNumberAndRetryCounter.getTxnNumber() << " on session "
- << _sessionId() << " using txnRetryCounter "
- << txnNumberAndRetryCounter.getTxnRetryCounter()
- << " because it is using a lower txnRetryCounter "
- << o().txnNumberAndRetryCounter.getTxnRetryCounter(),
- action == TransactionActions::kStart);
- _resetRouterStateForStartTransaction(opCtx, txnNumberAndRetryCounter);
+ case TransactionActions::kCommit:
+ ++p().latestStmtId;
+ _onContinue(opCtx);
+ break;
}
}
-void TransactionRouter::Router::_beginNewTxnNumber(
- OperationContext* opCtx,
- TxnNumberAndRetryCounter txnNumberAndRetryCounter,
- TransactionActions action) {
+void TransactionRouter::Router::_beginTxn(OperationContext* opCtx,
+ TxnNumberAndRetryCounter txnNumberAndRetryCounter,
+ TransactionActions action) {
invariant(txnNumberAndRetryCounter.getTxnNumber() >
o().txnNumberAndRetryCounter.getTxnNumber());
@@ -1046,12 +1003,10 @@ void TransactionRouter::Router::_beginNewTxnNumber(
};
}
-void TransactionRouter::Router::beginOrContinueTxn(
- OperationContext* opCtx,
- TxnNumberAndRetryCounter txnNumberAndRetryCounter,
- TransactionActions action) {
- invariant(txnNumberAndRetryCounter.getTxnRetryCounter() >= 0,
- "Cannot specify a negative txnRetryCounter");
+void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action) {
+ const TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber, 0};
if (txnNumberAndRetryCounter.getTxnNumber() < o().txnNumberAndRetryCounter.getTxnNumber()) {
// This transaction is older than the transaction currently in progress, so throw an error.
@@ -1072,10 +1027,10 @@ void TransactionRouter::Router::beginOrContinueTxn(
o().apiParameters.toBSON().toString()),
apiParamsFromClient == o().apiParameters);
}
- _beginOrContinueActiveTxnNumber(opCtx, txnNumberAndRetryCounter, action);
+ _continueTxn(opCtx, txnNumberAndRetryCounter, action);
} else {
// This is a newer transaction.
- _beginNewTxnNumber(opCtx, txnNumberAndRetryCounter, action);
+ _beginTxn(opCtx, txnNumberAndRetryCounter, action);
}
_updateLastClientInfo(opCtx->getClient());