diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/operation_context_session_mongod.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service_test.cpp | 77 |
3 files changed, 82 insertions, 29 deletions
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp index bcb7a889cd3..b799a51f044 100644 --- a/src/mongo/db/operation_context_session_mongod.cpp +++ b/src/mongo/db/operation_context_session_mongod.cpp @@ -43,10 +43,11 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o : _operationContextSession(opCtx, shouldCheckOutSession) { if (shouldCheckOutSession && !opCtx->getClient()->isInDirectClient()) { const auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant->refreshFromStorageIfNeeded(opCtx); - const auto clientTxnNumber = *opCtx->getTxnNumber(); + txnParticipant->refreshFromStorageIfNeeded(opCtx); + txnParticipant->beginOrContinue(clientTxnNumber, autocommit, startTransaction); + if (startTransaction && *startTransaction) { // If this shard has been selected as the coordinator, set up the coordinator state // to be ready to receive votes. @@ -54,8 +55,6 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o createTransactionCoordinator(opCtx, clientTxnNumber); } } - - txnParticipant->beginOrContinue(clientTxnNumber, autocommit, startTransaction); } } diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index ff2adbd88b9..2a5e2b45f74 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -84,20 +84,27 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber, Date_t commitDeadline) { - // TODO (SERVER-37021): Validate lsid and txnNumber against latest txnNumber on session in the - // catalog. - - auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(lsid); - // TODO (SERVER-37039): The below removal logic for a coordinator will change/be removed once we - // allow multiple coordinators for a session. - if (latestTxnNumAndCoordinator) { + if (auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(lsid)) { auto latestCoordinator = latestTxnNumAndCoordinator.get().second; + if (txnNumber == latestTxnNumAndCoordinator.get().first) { + // If we're trying to re-create a coordinator for an already-existing lsid and + // txnNumber, we should be able to continue to use that coordinator, which MUST be in + // an unused state. In the state machine, the initial state is encoded as + // kWaitingForParticipantList, but this uassert won't necessarily catch all bugs + // because it's possible that the participant list (via coordinateCommit) could be en + // route when we reach this point or that votes have been received before reaching + // coordinateCommit. + uassert(50968, + "Cannot start a new transaction with the same session ID and transaction " + "number as a transaction that has already begun two-phase commit.", + latestCoordinator->state() == + TransactionCoordinator::StateMachine::State::kWaitingForParticipantList); + + return; + } // Call tryAbort on previous coordinator. auto actionToTake = latestCoordinator.get()->recvTryAbort(); doCoordinatorAction(opCtx, latestCoordinator, actionToTake); - - // Wait for coordinator to finish committing or aborting. - latestCoordinator->waitForCompletion().get(opCtx); } _coordinatorCatalog->create(lsid, txnNumber); diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index 56a6069af1b..9009a9dab1b 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -239,26 +239,73 @@ TEST_F(TransactionCoordinatorServiceTest, } TEST_F(TransactionCoordinatorServiceTest, - CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) { - // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation. -} + RetryingCreateCoordinatorForSameLsidAndTxnNumberSucceeds) { + + TransactionCoordinatorService coordinatorService; + + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + // Retry create. This should succeed but not replace the old coordinator. + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); -TEST_F( - TransactionCoordinatorServiceTest, - CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnWaitsForPreviousTxnToCommitAndSucceeds) { - // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation. + commitTransaction(coordinatorService, lsid(), txnNumber(), kTwoShardIdSet); } -TEST_F( - TransactionCoordinatorServiceTest, - CreateCoordinatorWithSameTxnNumberAsOngoingUncommittedTxnThrowsIfPreviousCoordinatorHasReceivedEvents) { - // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation. +TEST_F(TransactionCoordinatorServiceTest, + CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) { + + TransactionCoordinatorService coordinatorService; + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + + // This is currently the only way we have to get the commit decision. + auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + // Create a coordinator for a higher transaction number in the same session. + coordinatorService.createCoordinator( + operationContext(), lsid(), txnNumber() + 1, kCommitDeadline); + + assertAbortSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + + // We should have aborted the previous transaction. + ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()), + static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort)); + + // Make sure the newly created one works fine. + commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet); } -TEST_F( - TransactionCoordinatorServiceTest, - CreateCoordinatorWithSameTxnNumberAsOngoingUncommittedTxnSucceedsIfPreviousCoordinatorHasNotReceivedEvents) { - // TODO (SERVER-37021): Implement once more validation is implemented for coordinator creation. +TEST_F(TransactionCoordinatorServiceTest, + CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnCommitsPreviousTxnAndSucceeds) { + + TransactionCoordinatorService coordinatorService; + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + + // Progress the transaction up until the point where it has sent commit and is waiting for + // commit acks. + auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + coordinatorService.voteCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); + coordinatorService.voteCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp); + + // Create a coordinator for a higher transaction number in the same session. This should + // "tryAbort" on the old coordinator which should NOT abort it since it's already waiting for + // commit acks. + coordinatorService.createCoordinator( + operationContext(), lsid(), txnNumber() + 1, kCommitDeadline); + + // Finish committing the old transaction by sending it commit acks from both participants. + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + + // The old transaction should now be committed. + ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()), + static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit)); + + // Make sure the newly created one works fine too. + commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet); } TEST_F(TransactionCoordinatorServiceTestSingleTxn, |