summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_test.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp766
1 files changed, 766 insertions, 0 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
new file mode 100644
index 00000000000..c804abff8ae
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -0,0 +1,766 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/s/transaction_coordinator_document_gen.h"
+#include "mongo/db/s/transaction_coordinator_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+const Timestamp kDummyTimestamp = Timestamp::min();
+const Date_t kCommitDeadline = Date_t::max();
+const StatusWith<BSONObj> kNoSuchTransaction =
+ BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
+const StatusWith<BSONObj> kOk = BSON("ok" << 1);
+const Timestamp kDummyPrepareTimestamp = Timestamp(1, 1);
+
+StatusWith<BSONObj> makePrepareOkResponse(const Timestamp& timestamp) {
+ return BSON("ok" << 1 << "prepareTimestamp" << timestamp);
+}
+
+const StatusWith<BSONObj> kPrepareOk = makePrepareOkResponse(kDummyPrepareTimestamp);
+
+class TransactionCoordinatorTestBase : public TransactionCoordinatorTestFixture {
+protected:
+ void assertPrepareSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kPrepareOk,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ }
+
+ void assertPrepareSentAndRespondWithSuccess(const Timestamp& timestamp) {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ makePrepareOkResponse(timestamp),
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ }
+
+ void assertPrepareSentAndRespondWithNoSuchTransaction() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kNoSuchTransaction,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ }
+
+ void assertPrepareSentAndRespondWithRetryableError() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kRetryableError,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ void assertAbortSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith("abortTransaction", kOk, boost::none);
+ }
+
+ void assertCommitSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith(CommitTransaction::kCommandName, kOk, boost::none);
+ }
+
+ void assertCommitSentAndRespondWithRetryableError() {
+ assertCommandSentAndRespondWith(
+ CommitTransaction::kCommandName, kRetryableError, boost::none);
+ }
+
+ void assertNoMessageSent() {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(network());
+ ASSERT_FALSE(network()->hasReadyRequests());
+ }
+
+ /**
+ * Goes through the steps to commit a transaction through the coordinator service for a given
+ * lsid and txnNumber. Useful when not explictly testing the commit protocol.
+ */
+ void commitTransaction(const std::set<ShardId>& transactionParticipantShards) {
+ for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
+ assertPrepareSentAndRespondWithSuccess(kDummyPrepareTimestamp);
+ }
+
+ for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
+ assertCommitSentAndRespondWithSuccess();
+ }
+ }
+
+ /**
+ * Goes through the steps to abort a transaction through the coordinator service for a given
+ * lsid and txnNumber. Useful when not explictly testing the abort protocol.
+ */
+ void abortTransaction(const std::set<ShardId>& shardIdSet, const ShardId& abortingShard) {
+ // auto commitDecisionFuture =
+ // coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber,
+ // shardIdSet);
+
+ for (size_t i = 0; i < shardIdSet.size(); ++i) {
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ }
+
+ // Abort gets sent to the second participant as soon as the first participant
+ // receives a not-okay response to prepare.
+ assertAbortSentAndRespondWithSuccess();
+
+ // Wait for abort to complete.
+ // commitDecisionFuture.get();
+ }
+
+ LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
+ TxnNumber _txnNumber{1};
+};
+
+class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase {
+protected:
+ void setUp() override {
+ TransactionCoordinatorTestBase::setUp();
+ _driver.emplace(getServiceContext());
+ }
+
+ void tearDown() override {
+ _driver.reset();
+ TransactionCoordinatorTestBase::tearDown();
+ }
+
+ boost::optional<TransactionCoordinatorDriver> _driver;
+};
+
+auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnNumber) {
+ PrepareTransaction prepareCmd;
+ prepareCmd.setDbName("admin");
+ auto prepareObj = prepareCmd.toBSON(
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
+ << WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::InternalMajorityNoSnapshot));
+
+
+ return prepareObj;
+}
+
+TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) {
+ Future<void> future = _driver->sendDecisionToParticipantShard(
+ kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ assertPrepareSentAndRespondWithSuccess();
+ future.get(operationContext());
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) {
+ Future<void> future = _driver->sendDecisionToParticipantShard(
+ kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ ASSERT(!future.isReady());
+
+ assertPrepareSentAndRespondWithRetryableError();
+ ASSERT(!future.isReady());
+
+ assertPrepareSentAndRespondWithSuccess();
+ future.get(operationContext());
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) {
+ Future<void> future = _driver->sendDecisionToParticipantShard(
+ kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithSuccess();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) {
+ Future<void> future = _driver->sendDecisionToParticipantShard(
+ kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); });
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+}
+
+TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) {
+ Future<PrepareResponse> future =
+ _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) {
+ ASSERT_OK(swResponse.getStatus());
+ auto response = swResponse.getValue();
+ ASSERT(response.vote == txn::PrepareVote::kCommit);
+ ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
+ });
+
+ // Simulate a participant voting to commit.
+ assertPrepareSentAndRespondWithSuccess();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) {
+ Future<PrepareResponse> future =
+ _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ std::move(future).getAsync([](StatusWith<PrepareResponse> swResponse) {
+ ASSERT_OK(swResponse.getStatus());
+ auto response = swResponse.getValue();
+ ASSERT(response.vote == txn::PrepareVote::kCommit);
+ ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp);
+ });
+
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithSuccess();
+}
+
+TEST_F(
+ TransactionCoordinatorDriverTest,
+ SendPrepareToShardStopsRetryingAfterRetryableErrorAndReturnsNoneIfCoordinatorStateIsNotPrepare) {
+ Future<PrepareResponse> future =
+ _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+
+ auto resultFuture = std::move(future).then([](PrepareResponse response) {
+ ASSERT(response.vote == boost::none);
+ ASSERT(response.prepareTimestamp == boost::none);
+ });
+
+ _driver->cancel();
+ assertPrepareSentAndRespondWithRetryableError();
+ resultFuture.get();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) {
+ auto future =
+ _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber))
+ .then([&](PrepareResponse response) {
+ ASSERT(response.vote == txn::PrepareVote::kAbort);
+ ASSERT(response.prepareTimestamp == boost::none);
+ return response;
+ });
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ auto response = future.get();
+ ASSERT(response.vote == txn::PrepareVote::kAbort);
+ ASSERT(response.prepareTimestamp == boost::none);
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) {
+ auto future =
+ _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber))
+ .then([&](PrepareResponse response) {
+ ASSERT(response.vote == txn::PrepareVote::kAbort);
+ ASSERT(response.prepareTimestamp == boost::none);
+ });
+
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ future.get();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) {
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
+ .then([&](txn::PrepareVoteConsensus response) {
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
+ });
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertPrepareSentAndRespondWithSuccess();
+ future.get();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) {
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
+ .then([&](txn::PrepareVoteConsensus response) {
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
+ });
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ future.get();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) {
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
+ .then([&](txn::PrepareVoteConsensus response) {
+ ASSERT(response.decision == txn::CommitDecision::kAbort);
+ ASSERT(response.maxPrepareTimestamp == boost::none);
+ });
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ future.get();
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareReturnsCommitDecisionWhenBothParticipantsVoteCommit) {
+ const auto firstPrepareTimestamp = Timestamp(1, 1);
+ const auto maxPrepareTimestamp = Timestamp(2, 1);
+
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
+ .then([&](txn::PrepareVoteConsensus response) {
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
+ });
+
+ assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
+ assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
+ future.get(); // Should be able to return after the first participant responds.
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareReturnsCommitDecisionWithMaxPrepareTimestampWhenFirstParticipantHasMax) {
+ const auto firstPrepareTimestamp = Timestamp(1, 1);
+ const auto maxPrepareTimestamp = Timestamp(2, 1);
+
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
+ .then([&](txn::PrepareVoteConsensus response) {
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
+ });
+
+ assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
+ assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
+ future.get(); // Should be able to return after the first participant responds.
+}
+
+TEST_F(TransactionCoordinatorDriverTest,
+ SendPrepareReturnsCommitDecisionWithMaxPrepareTimestampWhenLastParticipantHasMax) {
+ const auto firstPrepareTimestamp = Timestamp(1, 1);
+ const auto maxPrepareTimestamp = Timestamp(2, 1);
+
+ auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber)
+ .then([&](txn::PrepareVoteConsensus response) {
+ ASSERT(response.decision == txn::CommitDecision::kCommit);
+ ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
+ });
+
+ assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
+ assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
+ future.get(); // Should be able to return after the first participant responds.
+}
+
+
+class TransactionCoordinatorDriverPersistenceTest : public TransactionCoordinatorDriverTest {
+protected:
+ static void assertDocumentMatches(
+ TransactionCoordinatorDocument doc,
+ LogicalSessionId expectedLsid,
+ TxnNumber expectedTxnNum,
+ std::vector<ShardId> expectedParticipants,
+ boost::optional<txn::CommitDecision> expectedDecision = boost::none,
+ boost::optional<Timestamp> expectedCommitTimestamp = boost::none) {
+ ASSERT(doc.getId().getSessionId());
+ ASSERT_EQUALS(*doc.getId().getSessionId(), expectedLsid);
+ ASSERT(doc.getId().getTxnNumber());
+ ASSERT_EQUALS(*doc.getId().getTxnNumber(), expectedTxnNum);
+
+ ASSERT(doc.getParticipants() == expectedParticipants);
+
+ auto decision = doc.getDecision();
+ if (expectedDecision) {
+ ASSERT(*expectedDecision == decision->decision);
+ } else {
+ ASSERT(!decision);
+ }
+
+ if (expectedCommitTimestamp) {
+ ASSERT(decision->commitTimestamp);
+ ASSERT_EQUALS(*expectedCommitTimestamp, *decision->commitTimestamp);
+ } else if (decision) {
+ ASSERT(!decision->commitTimestamp);
+ }
+ }
+
+ void persistParticipantListExpectSuccess(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participants) {
+ _driver->persistParticipantList(lsid, txnNumber, participants).get();
+
+ auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants);
+ }
+
+ void persistDecisionExpectSuccess(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& participants,
+ const boost::optional<Timestamp>& commitTimestamp) {
+ _driver->persistDecision(lsid, txnNumber, participants, commitTimestamp).get();
+
+ auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ if (commitTimestamp) {
+ assertDocumentMatches(allCoordinatorDocs[0],
+ lsid,
+ txnNumber,
+ participants,
+ txn::CommitDecision::kCommit,
+ *commitTimestamp);
+ } else {
+ assertDocumentMatches(
+ allCoordinatorDocs[0], lsid, txnNumber, participants, txn::CommitDecision::kAbort);
+ }
+ }
+
+ void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ _driver->deleteCoordinatorDoc(lsid, txnNumber).get();
+
+ auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0));
+ }
+
+ const std::vector<ShardId> _participants{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")};
+
+ const Timestamp _commitTimestamp{Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0)};
+};
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistParticipantListWhenNoDocumentForTransactionExistsSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistParticipantListWhenMatchingDocumentForTransactionExistsSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistParticipantListWhenDocumentWithConflictingParticipantListExistsFails) {
+ std::vector<ShardId> participants{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")};
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, participants);
+
+ std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")};
+ ASSERT_THROWS_CODE(
+ _driver->persistParticipantList(_lsid, _txnNumber, smallerParticipantList).get(),
+ AssertionException,
+ 51025);
+
+ std::vector<ShardId> largerParticipantList{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003"), ShardId("shard0004")};
+ ASSERT_THROWS_CODE(
+ _driver->persistParticipantList(_lsid, _txnNumber, largerParticipantList).get(),
+ AssertionException,
+ 51025);
+
+ std::vector<ShardId> differentSameSizeParticipantList{
+ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0004")};
+ ASSERT_THROWS_CODE(
+ _driver->persistParticipantList(_lsid, _txnNumber, differentSameSizeParticipantList).get(),
+ AssertionException,
+ 51025);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistParticipantListForMultipleTransactionsOnSameSession) {
+ for (int i = 1; i <= 3; i++) {
+ auto txnNumber = TxnNumber{i};
+ _driver->persistParticipantList(_lsid, txnNumber, _participants).get();
+
+ auto allCoordinatorDocs =
+ TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
+ }
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleSessions) {
+ for (int i = 1; i <= 3; i++) {
+ auto lsid = makeLogicalSessionIdForTest();
+ _driver->persistParticipantList(lsid, _txnNumber, _participants).get();
+
+ auto allCoordinatorDocs =
+ TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
+ }
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistAbortDecisionWhenNoDocumentForTransactionExistsFails) {
+ ASSERT_THROWS_CODE(
+ _driver->persistDecision(_lsid, _txnNumber, _participants, boost::none /* abort */).get(),
+ AssertionException,
+ 51026);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistAbortDecisionWhenDocumentExistsWithoutDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistAbortDecisionWhenDocumentExistsWithSameDecisionSucceeds) {
+
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistCommitDecisionWhenNoDocumentForTransactionExistsFails) {
+ ASSERT_THROWS_CODE(
+ _driver->persistDecision(_lsid, _txnNumber, _participants, _commitTimestamp /* commit */)
+ .get(),
+ AssertionException,
+ 51026);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistCommitDecisionWhenDocumentExistsWithoutDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistCommitDecisionWhenDocumentExistsWithSameDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistCommitDecisionWhenDocumentExistsWithDifferentCommitTimestampFails) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+
+ const Timestamp differentCommitTimestamp(Date_t::now().toMillisSinceEpoch() / 1000, 1);
+ ASSERT_THROWS_CODE(
+ _driver
+ ->persistDecision(
+ _lsid, _txnNumber, _participants, differentCommitTimestamp /* commit */)
+ .get(),
+ AssertionException,
+ 51026);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistAbortDecisionWhenDocumentExistsWithDifferentDecisionFails) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+
+ ASSERT_THROWS_CODE(
+ _driver->persistDecision(_lsid, _txnNumber, _participants, boost::none /* abort */).get(),
+ AssertionException,
+ 51026);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ PersistCommitDecisionWhenDocumentExistsWithDifferentDecisionFails) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+
+ ASSERT_THROWS_CODE(
+ _driver->persistDecision(_lsid, _txnNumber, _participants, _commitTimestamp /* abort */)
+ .get(),
+ AssertionException,
+ 51026);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) {
+ ASSERT_THROWS_CODE(
+ _driver->deleteCoordinatorDoc(_lsid, _txnNumber).get(), AssertionException, 51027);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ ASSERT_THROWS_CODE(
+ _driver->deleteCoordinatorDoc(_lsid, _txnNumber).get(), AssertionException, 51027);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithAbortDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
+ deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumber);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ DeleteCoordinatorDocWhenDocumentExistsWithCommitDecisionSucceeds) {
+ persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
+ persistDecisionExpectSuccess(
+ operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
+ deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumber);
+}
+
+TEST_F(TransactionCoordinatorDriverPersistenceTest,
+ MultipleCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) {
+ const auto txnNumber1 = TxnNumber{4};
+ const auto txnNumber2 = TxnNumber{5};
+
+ // Insert coordinator documents for two transactions.
+ _driver->persistParticipantList(_lsid, txnNumber1, _participants).get();
+ _driver->persistParticipantList(_lsid, txnNumber2, _participants).get();
+
+ auto allCoordinatorDocs =
+ TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2));
+
+ // Delete the document for the first transaction and check that only the second transaction's
+ // document still exists.
+ _driver->persistDecision(_lsid, txnNumber1, _participants, boost::none /* abort */).get();
+ _driver->deleteCoordinatorDoc(_lsid, txnNumber1).get();
+
+ allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
+ assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumber2, _participants);
+}
+
+
+using TransactionCoordinatorTest = TransactionCoordinatorTestBase;
+
+TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) {
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
+ auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+
+ // Simulate a participant voting to abort.
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) {
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
+ auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest,
+ RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) {
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
+ auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+
+ // One participant votes abort after retry.
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ // One participant votes commit.
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest,
+ RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) {
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
+ auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+
+ // One participant votes abort after retry.
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ // One participant votes abort.
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
+
+ coordinator.onCompletion().get();
+}
+
+TEST_F(TransactionCoordinatorTest,
+ RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) {
+ TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber);
+ auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList);
+
+ // One participant votes commit after retry.
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithSuccess();
+
+ // One participant votes commit after retry.
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithRetryableError();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+
+ coordinator.onCompletion().get();
+}
+
+} // namespace
+} // namespace mongo