summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/operation_context_session_mongod.cpp7
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp27
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp77
3 files changed, 82 insertions, 29 deletions
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp
index bcb7a889cd3..b799a51f044 100644
--- a/src/mongo/db/operation_context_session_mongod.cpp
+++ b/src/mongo/db/operation_context_session_mongod.cpp
@@ -43,10 +43,11 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o
: _operationContextSession(opCtx, shouldCheckOutSession) {
if (shouldCheckOutSession && !opCtx->getClient()->isInDirectClient()) {
const auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant->refreshFromStorageIfNeeded(opCtx);
-
const auto clientTxnNumber = *opCtx->getTxnNumber();
+ txnParticipant->refreshFromStorageIfNeeded(opCtx);
+ txnParticipant->beginOrContinue(clientTxnNumber, autocommit, startTransaction);
+
if (startTransaction && *startTransaction) {
// If this shard has been selected as the coordinator, set up the coordinator state
// to be ready to receive votes.
@@ -54,8 +55,6 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o
createTransactionCoordinator(opCtx, clientTxnNumber);
}
}
-
- txnParticipant->beginOrContinue(clientTxnNumber, autocommit, startTransaction);
}
}
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index ff2adbd88b9..2a5e2b45f74 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -84,20 +84,27 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber,
Date_t commitDeadline) {
- // TODO (SERVER-37021): Validate lsid and txnNumber against latest txnNumber on session in the
- // catalog.
-
- auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(lsid);
- // TODO (SERVER-37039): The below removal logic for a coordinator will change/be removed once we
- // allow multiple coordinators for a session.
- if (latestTxnNumAndCoordinator) {
+ if (auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(lsid)) {
auto latestCoordinator = latestTxnNumAndCoordinator.get().second;
+ if (txnNumber == latestTxnNumAndCoordinator.get().first) {
+ // If we're trying to re-create a coordinator for an already-existing lsid and
+ // txnNumber, we should be able to continue to use that coordinator, which MUST be in
+ // an unused state. In the state machine, the initial state is encoded as
+ // kWaitingForParticipantList, but this uassert won't necessarily catch all bugs
+ // because it's possible that the participant list (via coordinateCommit) could be en
+ // route when we reach this point or that votes have been received before reaching
+ // coordinateCommit.
+ uassert(50968,
+ "Cannot start a new transaction with the same session ID and transaction "
+ "number as a transaction that has already begun two-phase commit.",
+ latestCoordinator->state() ==
+ TransactionCoordinator::StateMachine::State::kWaitingForParticipantList);
+
+ return;
+ }
// Call tryAbort on previous coordinator.
auto actionToTake = latestCoordinator.get()->recvTryAbort();
doCoordinatorAction(opCtx, latestCoordinator, actionToTake);
-
- // Wait for coordinator to finish committing or aborting.
- latestCoordinator->waitForCompletion().get(opCtx);
}
_coordinatorCatalog->create(lsid, txnNumber);
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 56a6069af1b..9009a9dab1b 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -239,26 +239,73 @@ TEST_F(TransactionCoordinatorServiceTest,
}
TEST_F(TransactionCoordinatorServiceTest,
- CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) {
- // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation.
-}
+ RetryingCreateCoordinatorForSameLsidAndTxnNumberSucceeds) {
+
+ TransactionCoordinatorService coordinatorService;
+
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+ // Retry create. This should succeed but not replace the old coordinator.
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
-TEST_F(
- TransactionCoordinatorServiceTest,
- CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnWaitsForPreviousTxnToCommitAndSucceeds) {
- // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation.
+ commitTransaction(coordinatorService, lsid(), txnNumber(), kTwoShardIdSet);
}
-TEST_F(
- TransactionCoordinatorServiceTest,
- CreateCoordinatorWithSameTxnNumberAsOngoingUncommittedTxnThrowsIfPreviousCoordinatorHasReceivedEvents) {
- // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation.
+TEST_F(TransactionCoordinatorServiceTest,
+ CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) {
+
+ TransactionCoordinatorService coordinatorService;
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+
+ // This is currently the only way we have to get the commit decision.
+ auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ // Create a coordinator for a higher transaction number in the same session.
+ coordinatorService.createCoordinator(
+ operationContext(), lsid(), txnNumber() + 1, kCommitDeadline);
+
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ // We should have aborted the previous transaction.
+ ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
+ static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
+
+ // Make sure the newly created one works fine.
+ commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
}
-TEST_F(
- TransactionCoordinatorServiceTest,
- CreateCoordinatorWithSameTxnNumberAsOngoingUncommittedTxnSucceedsIfPreviousCoordinatorHasNotReceivedEvents) {
- // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation.
+TEST_F(TransactionCoordinatorServiceTest,
+ CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnCommitsPreviousTxnAndSucceeds) {
+
+ TransactionCoordinatorService coordinatorService;
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+
+ // Progress the transaction up until the point where it has sent commit and is waiting for
+ // commit acks.
+ auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+ coordinatorService.voteCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
+ coordinatorService.voteCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
+
+ // Create a coordinator for a higher transaction number in the same session. This should
+ // "tryAbort" on the old coordinator which should NOT abort it since it's already waiting for
+ // commit acks.
+ coordinatorService.createCoordinator(
+ operationContext(), lsid(), txnNumber() + 1, kCommitDeadline);
+
+ // Finish committing the old transaction by sending it commit acks from both participants.
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ // The old transaction should now be committed.
+ ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
+ static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit));
+
+ // Make sure the newly created one works fine too.
+ commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,