summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_test.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp382
1 files changed, 220 insertions, 162 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 6132e8e97b9..2f3d918886e 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -41,8 +41,6 @@
namespace mongo {
namespace {
-const Timestamp kDummyTimestamp = Timestamp::min();
-const Date_t kCommitDeadline = Date_t::max();
const StatusWith<BSONObj> kNoSuchTransaction =
BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
@@ -99,41 +97,6 @@ protected:
ASSERT_FALSE(network()->hasReadyRequests());
}
- /**
- * Goes through the steps to commit a transaction through the coordinator service for a given
- * lsid and txnNumber. Useful when not explictly testing the commit protocol.
- */
- void commitTransaction(const std::set<ShardId>& transactionParticipantShards) {
- for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
- assertPrepareSentAndRespondWithSuccess(kDummyPrepareTimestamp);
- }
-
- for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
- assertCommitSentAndRespondWithSuccess();
- }
- }
-
- /**
- * Goes through the steps to abort a transaction through the coordinator service for a given
- * lsid and txnNumber. Useful when not explictly testing the abort protocol.
- */
- void abortTransaction(const std::set<ShardId>& shardIdSet, const ShardId& abortingShard) {
- // auto commitDecisionFuture =
- // coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber,
- // shardIdSet);
-
- for (size_t i = 0; i < shardIdSet.size(); ++i) {
- assertPrepareSentAndRespondWithNoSuchTransaction();
- }
-
- // Abort gets sent to the second participant as soon as the first participant
- // receives a not-okay response to prepare.
- assertAbortSentAndRespondWithSuccess();
-
- // Wait for abort to complete.
- // commitDecisionFuture.get();
- }
-
LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
TxnNumber _txnNumber{1};
};
@@ -166,105 +129,134 @@ auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnN
}
TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
- future.get(operationContext());
+
+ future.get();
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
- future.get(operationContext());
+
+ future.get();
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithSuccess();
+
+ future.get();
}
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
Future<void> future = _driver->sendDecisionToParticipantShard(
- kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ future.get();
}
-TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) {
- Future<PrepareResponse> future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+TEST_F(TransactionCoordinatorDriverTest,
+ SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<void> future = _driver->sendDecisionToParticipantShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ assertPrepareSentAndRespondWithRetryableError();
+ aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
+ advanceClockAndExecuteScheduledTasks();
+
+ ASSERT_THROWS_CODE(
+ future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown);
+}
- std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) {
- ASSERT_OK(swResponse.getStatus());
- auto response = swResponse.getValue();
- ASSERT(response.vote == txn::PrepareVote::kCommit);
- ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
- });
+TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
- // Simulate a participant voting to commit.
assertPrepareSentAndRespondWithSuccess();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kCommit);
+ ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) {
- Future<PrepareResponse> future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
-
- std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) {
- ASSERT_OK(swResponse.getStatus());
- auto response = swResponse.getValue();
- ASSERT(response.vote == txn::PrepareVote::kCommit);
- ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
- });
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
+ ASSERT(!future.isReady());
+
assertPrepareSentAndRespondWithSuccess();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kCommit);
+ ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
}
-TEST_F(
- TransactionCoordinatorDriverTest,
- SendPrepareToShardStopsRetryingAfterRetryableErrorAndReturnsNoneIfCoordinatorStateIsNotPrepare) {
- Future<PrepareResponse> future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardCanBeInterruptedAndReturnsNoDecisionIfNotServiceShutdown) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ assertPrepareSentAndRespondWithRetryableError();
+ aws.shutdown({ErrorCodes::TransactionCoordinatorReachedAbortDecision, "Retry interrupted"});
+ advanceClockAndExecuteScheduledTasks();
+
+ auto response = future.get();
+ ASSERT(response.vote == boost::none);
+ ASSERT(response.prepareTimestamp == boost::none);
+}
- auto resultFuture = std::move(future).then([](PrepareResponse response) {
- ASSERT(response.vote == boost::none);
- ASSERT(response.prepareTimestamp == boost::none);
- });
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardCanBeInterruptedAndThrowsExceptionIfServiceShutdown) {
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ Future<PrepareResponse> future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
- _driver->cancel();
assertPrepareSentAndRespondWithRetryableError();
- resultFuture.get();
+ aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"});
+ advanceClockAndExecuteScheduledTasks();
+
+ ASSERT_THROWS_CODE(
+ future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) {
- auto future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber))
- .then([&](PrepareResponse response) {
- ASSERT(response.vote == txn::PrepareVote::kAbort);
- ASSERT(response.prepareTimestamp == boost::none);
- return response;
- });
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -275,55 +267,52 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) {
- auto future =
- _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber))
- .then([&](PrepareResponse response) {
- ASSERT(response.vote == txn::PrepareVote::kAbort);
- ASSERT(response.prepareTimestamp == boost::none);
- });
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = _driver->sendPrepareToShard(
+ aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
- future.get();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kAbort);
+ ASSERT(response.prepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
- assertPrepareSentAndRespondWithNoSuchTransaction();
- assertPrepareSentAndRespondWithSuccess();
- future.get();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithNoSuchTransaction();
- future.get();
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
- assertPrepareSentAndRespondWithNoSuchTransaction();
- assertPrepareSentAndRespondWithNoSuchTransaction();
- future.get();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -331,15 +320,14 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
- future.get(); // Should be able to return after the first participant responds.
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -347,15 +335,14 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
- future.get(); // Should be able to return after the first participant responds.
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -363,15 +350,14 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
- .then([&](txn::PrepareVoteConsensus response) {
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
- });
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
- future.get(); // Should be able to return after the first participant responds.
+
+ auto response = future.get();
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
}
@@ -660,13 +646,40 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
using TransactionCoordinatorTest = TransactionCoordinatorTestBase;
-TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitResponses) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
- // Simulate a participant voting to abort.
- assertPrepareSentAndRespondWithNoSuchTransaction();
assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommitResponses) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
+
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -677,33 +690,40 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort)
coordinator.onCompletion().get();
}
-TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbortResponses) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
- assertPrepareSentAndRespondWithSuccess();
- assertPrepareSentAndRespondWithSuccess();
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; },
+ [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }});
- assertCommitSentAndRespondWithSuccess();
- assertCommitSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
coordinator.onCompletion().get();
}
-TEST_F(TransactionCoordinatorTest,
- RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortResponseOnly) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
- // One participant votes abort after retry.
- assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
-
- // One participant votes commit.
- assertPrepareSentAndRespondWithSuccess();
+ advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -715,16 +735,48 @@ TEST_F(TransactionCoordinatorTest,
}
TEST_F(TransactionCoordinatorTest,
- RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+ RunCommitProducesAbortDecisionOnOneCommitResponseAndOneAbortResponseAfterRetry) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
+
+ // One participant votes commit and other encounters retryable error
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; },
+ [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }});
+ advanceClockAndExecuteScheduledTasks(); // Make sure the scheduled retry executes
// One participant votes abort after retry.
- assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
- // One participant votes abort.
- assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest,
+ RunCommitProducesAbortDecisionOnOneAbortResponseAndOneRetryableAbortResponse) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
+
+ // One participant votes abort and other encounters retryable error
+ onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
+ [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }});
+ advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered
assertAbortSentAndRespondWithSuccess();
assertAbortSentAndRespondWithSuccess();
@@ -736,9 +788,15 @@ TEST_F(TransactionCoordinatorTest,
}
TEST_F(TransactionCoordinatorTest,
- RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) {
- TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
- auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+ RunCommitProducesCommitDecisionOnCommitAfterMultipleNetworkRetries) {
+ TransactionCoordinator coordinator(
+ getServiceContext(),
+ _lsid,
+ _txnNumber,
+ std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
+ boost::none);
+ coordinator.runCommit(kTwoShardIdList);
+ auto commitDecisionFuture = coordinator.getDecision();
// One participant votes commit after retry.
assertPrepareSentAndRespondWithRetryableError();