diff options
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_test.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 382 |
1 files changed, 220 insertions, 162 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 6132e8e97b9..2f3d918886e 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -41,8 +41,6 @@ namespace mongo { namespace { -const Timestamp kDummyTimestamp = Timestamp::min(); -const Date_t kCommitDeadline = Date_t::max(); const StatusWith<BSONObj> kNoSuchTransaction = BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction); const StatusWith<BSONObj> kOk = BSON("ok" << 1); @@ -99,41 +97,6 @@ protected: ASSERT_FALSE(network()->hasReadyRequests()); } - /** - * Goes through the steps to commit a transaction through the coordinator service for a given - * lsid and txnNumber. Useful when not explictly testing the commit protocol. - */ - void commitTransaction(const std::set<ShardId>& transactionParticipantShards) { - for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { - assertPrepareSentAndRespondWithSuccess(kDummyPrepareTimestamp); - } - - for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { - assertCommitSentAndRespondWithSuccess(); - } - } - - /** - * Goes through the steps to abort a transaction through the coordinator service for a given - * lsid and txnNumber. Useful when not explictly testing the abort protocol. - */ - void abortTransaction(const std::set<ShardId>& shardIdSet, const ShardId& abortingShard) { - // auto commitDecisionFuture = - // coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, - // shardIdSet); - - for (size_t i = 0; i < shardIdSet.size(); ++i) { - assertPrepareSentAndRespondWithNoSuchTransaction(); - } - - // Abort gets sent to the second participant as soon as the first participant - // receives a not-okay response to prepare. - assertAbortSentAndRespondWithSuccess(); - - // Wait for abort to complete. - // commitDecisionFuture.get(); - } - LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumber _txnNumber{1}; }; @@ -166,105 +129,134 @@ auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnN } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future<void> future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); - future.get(operationContext()); + + future.get(); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future<void> future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); - future.get(operationContext()); + + future.get(); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future<void> future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); }); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithSuccess(); + + future.get(); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future<void> future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); }); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithNoSuchTransaction(); + + future.get(); } -TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) { - Future<PrepareResponse> future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); +TEST_F(TransactionCoordinatorDriverTest, + SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future<void> future = _driver->sendDecisionToParticipantShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + + assertPrepareSentAndRespondWithRetryableError(); + aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); + advanceClockAndExecuteScheduledTasks(); + + ASSERT_THROWS_CODE( + future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown); +} - std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) { - ASSERT_OK(swResponse.getStatus()); - auto response = swResponse.getValue(); - ASSERT(response.vote == txn::PrepareVote::kCommit); - ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); - }); +TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future<PrepareResponse> future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + ASSERT(!future.isReady()); - // Simulate a participant voting to commit. assertPrepareSentAndRespondWithSuccess(); + + auto response = future.get(); + ASSERT(response.vote == txn::PrepareVote::kCommit); + ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) { - Future<PrepareResponse> future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) { - ASSERT_OK(swResponse.getStatus()); - auto response = swResponse.getValue(); - ASSERT(response.vote == txn::PrepareVote::kCommit); - ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); - }); + txn::AsyncWorkScheduler aws(getServiceContext()); + Future<PrepareResponse> future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); + ASSERT(!future.isReady()); + assertPrepareSentAndRespondWithSuccess(); + + auto response = future.get(); + ASSERT(response.vote == txn::PrepareVote::kCommit); + ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); } -TEST_F( - TransactionCoordinatorDriverTest, - SendPrepareToShardStopsRetryingAfterRetryableErrorAndReturnsNoneIfCoordinatorStateIsNotPrepare) { - Future<PrepareResponse> future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); +TEST_F(TransactionCoordinatorDriverTest, + SendPrepareToShardCanBeInterruptedAndReturnsNoDecisionIfNotServiceShutdown) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future<PrepareResponse> future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + + assertPrepareSentAndRespondWithRetryableError(); + aws.shutdown({ErrorCodes::TransactionCoordinatorReachedAbortDecision, "Retry interrupted"}); + advanceClockAndExecuteScheduledTasks(); + + auto response = future.get(); + ASSERT(response.vote == boost::none); + ASSERT(response.prepareTimestamp == boost::none); +} - auto resultFuture = std::move(future).then([](PrepareResponse response) { - ASSERT(response.vote == boost::none); - ASSERT(response.prepareTimestamp == boost::none); - }); +TEST_F(TransactionCoordinatorDriverTest, + SendPrepareToShardCanBeInterruptedAndThrowsExceptionIfServiceShutdown) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future<PrepareResponse> future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - _driver->cancel(); assertPrepareSentAndRespondWithRetryableError(); - resultFuture.get(); + aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"}); + advanceClockAndExecuteScheduledTasks(); + + ASSERT_THROWS_CODE( + future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) { - auto future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)) - .then([&](PrepareResponse response) { - ASSERT(response.vote == txn::PrepareVote::kAbort); - ASSERT(response.prepareTimestamp == boost::none); - return response; - }); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -275,55 +267,52 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) { - auto future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)) - .then([&](PrepareResponse response) { - ASSERT(response.vote == txn::PrepareVote::kAbort); - ASSERT(response.prepareTimestamp == boost::none); - }); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); - future.get(); + + auto response = future.get(); + ASSERT(response.vote == txn::PrepareVote::kAbort); + ASSERT(response.prepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); - assertPrepareSentAndRespondWithNoSuchTransaction(); - assertPrepareSentAndRespondWithSuccess(); - future.get(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kAbort); + ASSERT(response.maxPrepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithNoSuchTransaction(); - future.get(); + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kAbort); + ASSERT(response.maxPrepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); - assertPrepareSentAndRespondWithNoSuchTransaction(); - assertPrepareSentAndRespondWithNoSuchTransaction(); - future.get(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kAbort); + ASSERT(response.maxPrepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, @@ -331,15 +320,14 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); - future.get(); // Should be able to return after the first participant responds. + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kCommit); + ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); } TEST_F(TransactionCoordinatorDriverTest, @@ -347,15 +335,14 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); - future.get(); // Should be able to return after the first participant responds. + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kCommit); + ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); } TEST_F(TransactionCoordinatorDriverTest, @@ -363,15 +350,14 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); - future.get(); // Should be able to return after the first participant responds. + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kCommit); + ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); } @@ -660,13 +646,40 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, using TransactionCoordinatorTest = TransactionCoordinatorTestBase; -TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); +TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitResponses) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); - // Simulate a participant voting to abort. - assertPrepareSentAndRespondWithNoSuchTransaction(); assertPrepareSentAndRespondWithSuccess(); + assertPrepareSentAndRespondWithSuccess(); + + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + + auto commitDecision = commitDecisionFuture.get(); + ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit)); + + coordinator.onCompletion().get(); +} + +TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommitResponses) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); + + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -677,33 +690,40 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) coordinator.onCompletion().get(); } -TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); +TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbortResponses) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); - assertPrepareSentAndRespondWithSuccess(); - assertPrepareSentAndRespondWithSuccess(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }, + [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }}); - assertCommitSentAndRespondWithSuccess(); - assertCommitSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); auto commitDecision = commitDecisionFuture.get(); - ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit)); + ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort)); coordinator.onCompletion().get(); } -TEST_F(TransactionCoordinatorTest, - RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); +TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortResponseOnly) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); - // One participant votes abort after retry. - assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); - - // One participant votes commit. - assertPrepareSentAndRespondWithSuccess(); + advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -715,16 +735,48 @@ TEST_F(TransactionCoordinatorTest, } TEST_F(TransactionCoordinatorTest, - RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); + RunCommitProducesAbortDecisionOnOneCommitResponseAndOneAbortResponseAfterRetry) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); + + // One participant votes commit and other encounters retryable error + onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }, + [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }}); + advanceClockAndExecuteScheduledTasks(); // Make sure the scheduled retry executes // One participant votes abort after retry. - assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); - // One participant votes abort. - assertPrepareSentAndRespondWithNoSuchTransaction(); + assertAbortSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + + auto commitDecision = commitDecisionFuture.get(); + ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort)); + + coordinator.onCompletion().get(); +} + +TEST_F(TransactionCoordinatorTest, + RunCommitProducesAbortDecisionOnOneAbortResponseAndOneRetryableAbortResponse) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); + + // One participant votes abort and other encounters retryable error + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }}); + advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -736,9 +788,15 @@ TEST_F(TransactionCoordinatorTest, } TEST_F(TransactionCoordinatorTest, - RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); + RunCommitProducesCommitDecisionOnCommitAfterMultipleNetworkRetries) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); // One participant votes commit after retry. assertPrepareSentAndRespondWithRetryableError(); |