summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator_service_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_service_test.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator_service_test.cpp729
1 files changed, 729 insertions, 0 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp
new file mode 100644
index 00000000000..6f4eb769e2a
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp
@@ -0,0 +1,729 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/s/transaction_coordinator_test_fixture.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+const Date_t kCommitDeadline = Date_t::max();
+
+const BSONObj kDummyWriteConcernError = BSON("code" << ErrorCodes::WriteConcernFailed << "errmsg"
+ << "dummy");
+
+const StatusWith<BSONObj> kNoSuchTransactionAndWriteConcernError =
+ BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction << "writeConcernError"
+ << kDummyWriteConcernError);
+
+const StatusWith<BSONObj> kNoSuchTransaction =
+ BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
+const StatusWith<BSONObj> kOk = BSON("ok" << 1);
+const StatusWith<BSONObj> kOkButWriteConcernError =
+ BSON("ok" << 1 << "writeConcernError" << kDummyWriteConcernError);
+
+const StatusWith<BSONObj> kPrepareOk = BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1));
+const StatusWith<BSONObj> kPrepareOkButWriteConcernError =
+ BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1) << "writeConcernError"
+ << kDummyWriteConcernError);
+
+class TransactionCoordinatorServiceTestFixture : public TransactionCoordinatorTestFixture {
+public:
+ // Prepare responses
+
+ void assertPrepareSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kPrepareOk,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ }
+
+ void assertPrepareSentAndRespondWithSuccessAndWriteConcernError() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kPrepareOkButWriteConcernError,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ void assertPrepareSentAndRespondWithNoSuchTransaction() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kNoSuchTransaction,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ }
+
+ void assertPrepareSentAndRespondWithNoSuchTransactionAndWriteConcernError() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName,
+ kNoSuchTransactionAndWriteConcernError,
+ WriteConcernOptions::InternalMajorityNoSnapshot);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ // Abort responses
+
+ void assertAbortSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith("abortTransaction", kOk, WriteConcernOptions::Majority);
+ }
+
+ void assertAbortSentAndRespondWithSuccessAndWriteConcernError() {
+ assertCommandSentAndRespondWith(
+ "abortTransaction", kOkButWriteConcernError, WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ void assertAbortSentAndRespondWithNoSuchTransaction() {
+ assertCommandSentAndRespondWith(
+ "abortTransaction", kNoSuchTransaction, WriteConcernOptions::Majority);
+ }
+
+ void assertAbortSentAndRespondWithNoSuchTransactionAndWriteConcernError() {
+ assertCommandSentAndRespondWith("abortTransaction",
+ kNoSuchTransactionAndWriteConcernError,
+ WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ // Commit responses
+
+ void assertCommitSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith(
+ CommitTransaction::kCommandName, kOk, WriteConcernOptions::Majority);
+ }
+
+ void assertCommitSentAndRespondWithSuccessAndWriteConcernError() {
+ assertCommandSentAndRespondWith(CommitTransaction::kCommandName,
+ kOkButWriteConcernError,
+ WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ void assertCommitSentAndRespondWithRetryableError() {
+ assertCommandSentAndRespondWith(
+ CommitTransaction::kCommandName, kRetryableError, WriteConcernOptions::Majority);
+ advanceClockAndExecuteScheduledTasks();
+ }
+
+ // Other
+
+ void assertNoMessageSent() {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(network());
+ ASSERT_FALSE(network()->hasReadyRequests());
+ }
+
+ // TODO (SERVER-38382): Put all these helpers in one separate file and share with
+ // transaction_coordinator_test.
+
+ /**
+ * Goes through the steps to commit a transaction through the coordinator service for a given
+ * lsid and txnNumber. Useful when not explictly testing the commit protocol.
+ */
+ void commitTransaction(TransactionCoordinatorService& coordinatorService,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ const std::set<ShardId>& transactionParticipantShards) {
+ auto commitDecisionFuture = *coordinatorService.coordinateCommit(
+ operationContext(), lsid, txnNumber, transactionParticipantShards);
+
+ for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
+ assertPrepareSentAndRespondWithSuccess();
+ }
+
+ for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
+ assertCommitSentAndRespondWithSuccess();
+ }
+
+ // Wait for commit to complete.
+ commitDecisionFuture.get();
+ }
+
+ /**
+ * Goes through the steps to abort a transaction through the coordinator service for a given
+ * lsid and txnNumber. Useful when not explictly testing the abort protocol.
+ */
+ void abortTransaction(TransactionCoordinatorService& coordinatorService,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ const std::set<ShardId>& shardIdSet,
+ const ShardId& abortingShard) {
+ auto commitDecisionFuture =
+ *coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet);
+
+ for (size_t i = 0; i < shardIdSet.size(); ++i) {
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ }
+
+ for (size_t i = 0; i < shardIdSet.size(); ++i) {
+ assertAbortSentAndRespondWithSuccess();
+ }
+
+ // Wait for abort to complete.
+ commitDecisionFuture.get();
+ }
+
+ auto* service() const {
+ return TransactionCoordinatorService::get(operationContext());
+ }
+};
+
+
+using TransactionCoordinatorServiceStepUpStepDownTest = TransactionCoordinatorServiceTestFixture;
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsFailBeforeStepUpStarts) {
+ ASSERT_THROWS_CODE(service()->createCoordinator(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kCommitDeadline),
+ AssertionException,
+ ErrorCodes::NotMaster);
+
+ ASSERT_THROWS_CODE(service()->coordinateCommit(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet),
+ AssertionException,
+ ErrorCodes::NotMaster);
+
+ ASSERT_THROWS_CODE(
+ service()->recoverCommit(operationContext(), makeLogicalSessionIdForTest(), 0),
+ AssertionException,
+ ErrorCodes::NotMaster);
+}
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, OperationsBlockBeforeStepUpCompletes) {
+ service()->onStepUp(operationContext(), Milliseconds(1));
+ auto stepDownGuard = makeGuard([&] { service()->onStepDown(); });
+
+ ASSERT_THROWS_CODE(operationContext()->runWithDeadline(
+ Date_t::now() + Milliseconds{5},
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&] {
+ return service()->coordinateCommit(operationContext(),
+ makeLogicalSessionIdForTest(),
+ 0,
+ kTwoShardIdSet);
+ }),
+ AssertionException,
+ ErrorCodes::NetworkInterfaceExceededTimeLimit);
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(network());
+ network()->advanceTime(network()->now() + Milliseconds(1));
+ }
+
+ ASSERT(service()->coordinateCommit(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet) ==
+ boost::none);
+}
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepUpFailsDueToBadCoordinatorDocument) {
+ DBDirectClient client(operationContext());
+ client.insert(NamespaceString::kTransactionCoordinatorsNamespace.ns(), BSON("IllegalKey" << 1));
+ ASSERT_EQ("", client.getLastError());
+
+ service()->onStepUp(operationContext());
+ auto stepDownGuard = makeGuard([&] { service()->onStepDown(); });
+
+ ASSERT_THROWS_CODE(service()->coordinateCommit(
+ operationContext(), makeLogicalSessionIdForTest(), 0, kTwoShardIdSet),
+ AssertionException,
+ ErrorCodes::TypeMismatch);
+
+ ASSERT_THROWS_CODE(
+ service()->recoverCommit(operationContext(), makeLogicalSessionIdForTest(), 0),
+ AssertionException,
+ ErrorCodes::TypeMismatch);
+}
+
+TEST_F(TransactionCoordinatorServiceStepUpStepDownTest, StepDownBeforeStepUpTaskCompleted) {
+ // Call step-up with 1ms delay (meaning it will not actually execute until time is manually
+ // advanced on the underlying executor)
+ service()->onStepUp(operationContext(), Milliseconds(1));
+
+ // Should cancel all outstanding tasks (including the recovery task started by onStepUp above,
+ // which has not yet run)
+ service()->onStepDown();
+
+ // Do another onStepUp to ensure it runs successfully
+ service()->onStepUp(operationContext());
+
+ // Step-down the service so that the destructor does not complain
+ service()->onStepDown();
+}
+
+
+class TransactionCoordinatorServiceTest : public TransactionCoordinatorServiceTestFixture {
+protected:
+ void setUp() override {
+ TransactionCoordinatorServiceTestFixture::setUp();
+
+ service()->onStepUp(operationContext());
+ }
+
+ void tearDown() override {
+ service()->onStepDown();
+
+ TransactionCoordinatorServiceTestFixture::tearDown();
+ }
+
+ LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
+ TxnNumber _txnNumber{1};
+};
+
+TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorOnNewSessionSucceeds) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+ commitTransaction(*coordinatorService, _lsid, _txnNumber, kTwoShardIdSet);
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CreateCoordinatorForExistingSessionWithPreviouslyCommittedTxnSucceeds) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+ commitTransaction(*coordinatorService, _lsid, _txnNumber, kTwoShardIdSet);
+
+ coordinatorService->createCoordinator(
+ operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
+ commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet);
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ RetryingCreateCoordinatorForSameLsidAndTxnNumberSucceeds) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+ // Retry create. This should succeed but not replace the old coordinator.
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ commitTransaction(*coordinatorService, _lsid, _txnNumber, kTwoShardIdSet);
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnCommitsPreviousTxnAndSucceeds) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ // Progress the transaction up until the point where it has sent commit and is waiting for
+ // commit acks.
+ auto oldTxnCommitDecisionFuture = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ // Simulate all participants acking prepare/voting to commit.
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+
+ // Create a coordinator for a higher transaction number in the same session. This should
+ // "tryAbort" on the old coordinator which should NOT abort it since it's already waiting for
+ // commit acks.
+ coordinatorService->createCoordinator(
+ operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
+ auto newTxnCommitDecisionFuture = coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber + 1, kTwoShardIdSet);
+
+ // Finish committing the old transaction by sending it commit acks from both participants.
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ // The old transaction should now be committed.
+ ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kCommit));
+ commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet);
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinateCommitReturnsNoneIfNoCoordinatorEverExisted) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ auto commitDecisionFuture =
+ coordinatorService->coordinateCommit(operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+ ASSERT(boost::none == commitDecisionFuture);
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinateCommitReturnsNoneIfCoordinatorWasRemoved) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+ commitTransaction(*coordinatorService, _lsid, _txnNumber, kTwoShardIdSet);
+
+ auto commitDecisionFuture =
+ coordinatorService->coordinateCommit(operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+ ASSERT(boost::none == commitDecisionFuture);
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CoordinateCommitWithSameParticipantListJoinsOngoingCoordinationThatLeadsToAbort) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ auto commitDecisionFuture2 = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CoordinateCommitWithSameParticipantListJoinsOngoingCoordinationThatLeadsToCommit) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+
+ auto commitDecisionFuture2 = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationThatLeadsToAbort) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ auto commitDecisionFuture2 =
+ *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationThatLeadsToCommit) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+
+ auto commitDecisionFuture2 =
+ *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ RecoverCommitWorksIfCommitNeverReceivedAndCoordinationCanceled) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture =
+ *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber);
+
+ // Cancel previous coordinator by creating a new coordinator at a higher txn number.
+ coordinatorService->createCoordinator(
+ operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
+
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kCanceled));
+}
+
+TEST_F(
+ TransactionCoordinatorServiceTest,
+ CreateCoordinatorWithHigherTxnNumberThanExistingButNotYetCommittingTxnCancelsPreviousTxnAndSucceeds) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ // Create a coordinator for a higher transaction number in the same session. This should
+ // cancel commit on the old coordinator.
+ coordinatorService->createCoordinator(
+ operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
+ auto newTxnCommitDecisionFuture = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber + 1, kTwoShardIdSet);
+
+ // Since this transaction has already been canceled, this should return boost::none.
+ auto oldTxnCommitDecisionFuture =
+ coordinatorService->coordinateCommit(operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ // The old transaction should now be committed.
+ ASSERT(oldTxnCommitDecisionFuture == boost::none);
+
+ // Make sure the newly created one works fine too.
+ commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet);
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorToPrepare) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ // Coordinator sends prepare.
+ auto commitDecisionFuture = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ // One participant responds with writeConcern error.
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccessAndWriteConcernError();
+
+ // Coordinator retries prepare against participant that responded with writeConcern error until
+ // participant responds without writeConcern error.
+ assertPrepareSentAndRespondWithSuccessAndWriteConcernError();
+ assertPrepareSentAndRespondWithSuccessAndWriteConcernError();
+ assertPrepareSentAndRespondWithSuccessAndWriteConcernError();
+ assertPrepareSentAndRespondWithNoSuchTransactionAndWriteConcernError();
+ assertPrepareSentAndRespondWithNoSuchTransactionAndWriteConcernError();
+ assertPrepareSentAndRespondWithSuccessAndWriteConcernError();
+ assertPrepareSentAndRespondWithSuccess();
+
+ // Coordinator sends commit.
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ // The transaction should now be committed.
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kCommit));
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorToAbort) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ // Coordinator sends prepare.
+ auto commitDecisionFuture = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ // One participant votes to abort.
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ // One participant responds to abort with success.
+ assertAbortSentAndRespondWithSuccess();
+
+ // Coordinator retries abort against other participant until other participant responds without
+ // writeConcern error.
+ assertAbortSentAndRespondWithSuccessAndWriteConcernError();
+ assertAbortSentAndRespondWithSuccessAndWriteConcernError();
+ assertAbortSentAndRespondWithSuccessAndWriteConcernError();
+ assertAbortSentAndRespondWithSuccessAndWriteConcernError();
+ assertAbortSentAndRespondWithNoSuchTransactionAndWriteConcernError();
+ assertAbortSentAndRespondWithNoSuchTransactionAndWriteConcernError();
+ assertAbortSentAndRespondWithNoSuchTransaction();
+
+ // The transaction should now be aborted.
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kAbort));
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorToCommit) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ // Coordinator sends prepare.
+ auto commitDecisionFuture = *coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ // Both participants vote to commit.
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+
+ // One participant responds to commit with success.
+ assertCommitSentAndRespondWithSuccess();
+
+ // Coordinator retries commit against other participant until other participant responds without
+ // writeConcern error.
+ assertCommitSentAndRespondWithSuccessAndWriteConcernError();
+ assertCommitSentAndRespondWithSuccessAndWriteConcernError();
+ assertCommitSentAndRespondWithSuccessAndWriteConcernError();
+ assertCommitSentAndRespondWithSuccessAndWriteConcernError();
+ assertCommitSentAndRespondWithSuccessAndWriteConcernError();
+ assertCommitSentAndRespondWithSuccess();
+
+ // The transaction should now be committed.
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kCommit));
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CoordinatorIsCanceledIfDeadlinePassesAndHasNotReceivedParticipantList) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ const auto deadline = executor()->now() + Milliseconds(1000 * 60 * 10 /* 10 hours */);
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, deadline);
+
+ // Reach the deadline.
+ network()->enterNetwork();
+ network()->advanceTime(deadline);
+ network()->exitNetwork();
+
+ // The coordinator should no longer exist.
+ ASSERT(boost::none ==
+ coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet));
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CoordinatorIsNotCanceledIfDeadlinePassesButHasReceivedParticipantList) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+ const auto deadline = executor()->now() + Milliseconds(1000 * 60 * 10 /* 10 hours */);
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, deadline);
+
+ // Deliver the participant list before the deadline.
+ ASSERT(boost::none !=
+ coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet));
+
+ // Reach the deadline.
+ network()->enterNetwork();
+ network()->advanceTime(deadline);
+ network()->exitNetwork();
+
+ // The coordinator should still exist.
+ ASSERT(boost::none !=
+ coordinatorService->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet));
+}
+
+
+/**
+ * Fixture that during setUp automatically creates a coordinator for a default lsid/txnNumber pair.
+ */
+class TransactionCoordinatorServiceTestSingleTxn : public TransactionCoordinatorServiceTest {
+public:
+ void setUp() final {
+ TransactionCoordinatorServiceTest::setUp();
+ TransactionCoordinatorService::get(operationContext())
+ ->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+ }
+
+ TransactionCoordinatorService* coordinatorService() {
+ return TransactionCoordinatorService::get(operationContext());
+ }
+};
+
+TEST_F(TransactionCoordinatorServiceTestSingleTxn,
+ CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
+
+ auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ // Simulate a participant voting to abort.
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertPrepareSentAndRespondWithSuccess();
+
+ assertAbortSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kAbort));
+}
+
+TEST_F(TransactionCoordinatorServiceTestSingleTxn,
+ CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
+
+ auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ ASSERT_FALSE(commitDecisionFuture.isReady());
+ // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
+ abortTransaction(*coordinatorService(), _lsid, _txnNumber, kTwoShardIdSet, kTwoShardIdList[0]);
+}
+
+TEST_F(TransactionCoordinatorServiceTestSingleTxn,
+ CoordinateCommitReturnsCorrectCommitDecisionOnCommit) {
+
+ auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision), static_cast<int>(txn::CommitDecision::kCommit));
+}
+
+TEST_F(TransactionCoordinatorServiceTestSingleTxn,
+ ConcurrentCallsToCoordinateCommitReturnSameDecisionOnCommit) {
+
+ auto commitDecisionFuture1 = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+ auto commitDecisionFuture2 = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ commitTransaction(*coordinatorService(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTestSingleTxn,
+ ConcurrentCallsToCoordinateCommitReturnSameDecisionOnAbort) {
+
+ auto commitDecisionFuture1 = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+ auto commitDecisionFuture2 = *coordinatorService()->coordinateCommit(
+ operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
+
+ abortTransaction(*coordinatorService(), _lsid, _txnNumber, kTwoShardIdSet, kTwoShardIdList[0]);
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+} // namespace
+} // namespace mongo