diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-27 18:22:26 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-04-01 07:26:26 -0400 |
commit | 907e4760487e081db031232d1f8326c0f8bdef68 (patch) | |
tree | b27bdde6a3294072b5ff15ce07081e51696b2909 /src/mongo | |
parent | 9dd07662bbe80415d90131fca2be6312166d6f39 (diff) | |
download | mongo-907e4760487e081db031232d1f8326c0f8bdef68.tar.gz |
SERVER-40297 Make all TransactionCoordinatorDriver methods free functions
The TransactionCoordinatorDriver is not really a "driver", but just a
set of functions to perform asynchronous work. There isn't any state to
keep, so there is no need for them to be in a class.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/txn_cmds.idl | 5 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_driver.h | 218 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 175 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.cpp (renamed from src/mongo/db/s/transaction_coordinator_driver.cpp) | 232 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.h | 215 |
11 files changed, 454 insertions, 453 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl index e5f9e6c236d..76616b84c9c 100644 --- a/src/mongo/db/commands/txn_cmds.idl +++ b/src/mongo/db/commands/txn_cmds.idl @@ -28,7 +28,6 @@ global: cpp_namespace: "mongo" - imports: - "mongo/idl/basic_types.idl" - "mongo/s/sharding_types.idl" @@ -100,3 +99,7 @@ commands: progress on commit by processing using the info in the recoveryToken." optional: true type: TxnRecoveryToken + + abortTransaction: + description: "abortTransaction Command" + namespace: ignored diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d8edd672d09..0fc1b35ee08 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -122,11 +122,11 @@ env.Library( target='transaction_coordinator', source=[ 'transaction_coordinator_catalog.cpp', - 'transaction_coordinator_driver.cpp', 'transaction_coordinator_factory_mongod.cpp', 'transaction_coordinator_futures_util.cpp', 'transaction_coordinator_service.cpp', 'transaction_coordinator_structures.cpp', + 'transaction_coordinator_util.cpp', 'transaction_coordinator.cpp', env.Idlc('transaction_coordinator_document.idl')[0], ], diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 7689ddb2108..65b1231fe66 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -34,8 +34,6 @@ #include "mongo/db/s/transaction_coordinator.h" #include "mongo/db/logical_clock.h" -#include "mongo/db/s/transaction_coordinator_document_gen.h" -#include "mongo/db/s/transaction_coordinator_futures_util.h" #include "mongo/util/log.h" namespace mongo { @@ -43,28 +41,24 @@ namespace { using CommitDecision = txn::CommitDecision; using CoordinatorCommitDecision = txn::CoordinatorCommitDecision; +using PrepareVoteConsensus = txn::PrepareVoteConsensus; using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; -using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus; - CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(ServiceContext* service, const PrepareVoteConsensus& result, const LogicalSessionId& lsid, TxnNumber txnNumber) { - invariant(result.decision); - CoordinatorCommitDecision decision(*result.decision); - - if (result.decision == CommitDecision::kCommit) { - invariant(result.maxPrepareTimestamp); - - decision.setCommitTimestamp(Timestamp(result.maxPrepareTimestamp->getSecs(), - result.maxPrepareTimestamp->getInc() + 1)); + auto decision = result.decision(); + if (decision.getDecision() == CommitDecision::kCommit) { LOG(3) << "Advancing cluster time to the commit timestamp " << *decision.getCommitTimestamp() << " for " << lsid.getId() << ':' << txnNumber; uassertStatusOK(LogicalClock::get(service)->advanceClusterTime( - LogicalTime(*result.maxPrepareTimestamp))); + LogicalTime(*decision.getCommitTimestamp()))); + + decision.setCommitTimestamp(Timestamp(decision.getCommitTimestamp()->getSecs(), + decision.getCommitTimestamp()->getInc() + 1)); } return decision; @@ -80,8 +74,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, : _serviceContext(serviceContext), _lsid(lsid), _txnNumber(txnNumber), - _scheduler(std::move(scheduler)), - _driver(serviceContext, *_scheduler) { + _scheduler(std::move(scheduler)) { if (coordinateCommitDeadline) { _deadlineScheduler = _scheduler->makeChildScheduler(); _deadlineScheduler @@ -119,7 +112,7 @@ void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) { _cancelTimeoutWaitForCommitTask(); - _driver.persistParticipantList(_lsid, _txnNumber, participantShards) + txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, participantShards) .then([this, participantShards]() { return _runPhaseOne(participantShards); }) .then([this, participantShards](CoordinatorCommitDecision decision) { return _runPhaseTwo(participantShards, decision); @@ -194,16 +187,18 @@ void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() { Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne( const std::vector<ShardId>& participantShards) { - return _driver.sendPrepare(participantShards, _lsid, _txnNumber) + return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, participantShards) .then([this, participantShards](PrepareVoteConsensus result) { invariant(_state == CoordinatorState::kPreparing); auto decision = makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber); - return _driver - .persistDecision( - _lsid, _txnNumber, participantShards, decision.getCommitTimestamp()) + return txn::persistDecision(*_scheduler, + _lsid, + _txnNumber, + participantShards, + decision.getCommitTimestamp()) .then([decision] { return decision; }); }); } @@ -215,7 +210,7 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa if (MONGO_FAIL_POINT(doNotForgetCoordinator)) return Future<void>::makeReady(); - return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); }) .then([this] { LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber; @@ -233,11 +228,16 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants( switch (decision.getDecision()) { case CommitDecision::kCommit: _state = CoordinatorState::kCommitting; - return _driver.sendCommit( - participantShards, _lsid, _txnNumber, *decision.getCommitTimestamp()); + return txn::sendCommit(_serviceContext, + *_scheduler, + _lsid, + _txnNumber, + participantShards, + *decision.getCommitTimestamp()); case CommitDecision::kAbort: _state = CoordinatorState::kAborting; - return _driver.sendAbort(participantShards, _lsid, _txnNumber); + return txn::sendAbort( + _serviceContext, *_scheduler, _lsid, _txnNumber, participantShards); case CommitDecision::kCanceled: MONGO_UNREACHABLE; }; diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index 1e3ec5d820b..4e7a0a80351 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -31,7 +31,7 @@ #include <vector> -#include "mongo/db/s/transaction_coordinator_driver.h" +#include "mongo/db/s/transaction_coordinator_util.h" #include "mongo/logger/logstream_builder.h" #include "mongo/util/fail_point_service.h" @@ -187,10 +187,6 @@ private: // coordinator was not created with an expiration task. std::unique_ptr<txn::AsyncWorkScheduler> _deadlineScheduler; - // Context object used to perform and track the state of asynchronous operations on behalf of - // this coordinator. - TransactionCoordinatorDriver _driver; - // Protects the state below mutable stdx::mutex _mutex; diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h deleted file mode 100644 index 139b14b9ea8..00000000000 --- a/src/mongo/db/s/transaction_coordinator_driver.h +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/db/logical_session_id.h" -#include "mongo/db/s/transaction_coordinator_document_gen.h" -#include "mongo/db/s/transaction_coordinator_futures_util.h" -#include "mongo/executor/task_executor.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/future.h" - -namespace mongo { - -/** - * Single instance of this class is owned by each TransactionCoordinator. It abstracts the - * long-running blocking operations into a futurized interface and ensures that all outstanding - * operations are joined at completion. - * - * Methods on this class must be called from a single thread at a time and other than cancellation, - * no other operations are allowed until any returned futures are signalled. - */ -class TransactionCoordinatorDriver { - TransactionCoordinatorDriver(const TransactionCoordinatorDriver&) = delete; - TransactionCoordinatorDriver& operator=(const TransactionCoordinatorDriver&) = delete; - -public: - TransactionCoordinatorDriver(ServiceContext* serviceContext, - txn::AsyncWorkScheduler& scheduler); - - /** - * Upserts a document of the form: - * - * { - * _id: {lsid: <lsid>, txnNumber: <txnNumber>} - * participants: ["shard0000", "shard0001"] - * } - * - * into config.transaction_coordinators and waits for the upsert to be majority-committed. - * - * Throws if the upsert fails or waiting for writeConcern fails. - * - * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it - * means a document for the (lsid, txnNumber) exists with a different participant list. - */ - Future<void> persistParticipantList(const LogicalSessionId& lsid, - TxnNumber txnNumber, - std::vector<ShardId> participantList); - - /** - * Sends prepare to all participants and returns a future that will be resolved when either: - * a) All participants have responded with a vote to commit, or - * b) At least one participant votes to abort. - * - * If all participants vote to commit, the result will contain the max prepare timestamp of all - * prepare timestamps attached to the participants' responses. Otherwise the result will simply - * contain the decision to abort. - */ - struct PrepareVoteConsensus { - // Optional decision, if any was reached (decision could be empty if no response was - // received) - boost::optional<txn::CommitDecision> decision; - - // Should only be consulted if the decision is commit and contains the maximum prepare - // timestamp across all participants - boost::optional<Timestamp> maxPrepareTimestamp; - }; - Future<PrepareVoteConsensus> sendPrepare(const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber); - - /** - * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators - * for - * - * (lsid, txnNumber) to be: - * - * { - * _id: {lsid: <lsid>, txnNumber: <txnNumber>} - * participants: ["shard0000", "shard0001"] - * decision: "abort" - * } - * - * else updates the document to be: - * - * { - * _id: {lsid: <lsid>, txnNumber: <txnNumber>} - * participants: ["shard0000", "shard0001"] - * decision: "commit" - * commitTimestamp: Timestamp(xxxxxxxx, x), - * } - * - * and waits for the update to be majority-committed. - * - * Throws if the update fails or waiting for writeConcern fails. - * - * If the update succeeds but did not update any document, throws an anonymous error, because it - * means either no document for (lsid, txnNumber) exists, or a document exists but has a - * different participant list, different decision, or different commit Timestamp. - */ - Future<void> persistDecision(const LogicalSessionId& lsid, - TxnNumber txnNumber, - std::vector<ShardId> participantList, - const boost::optional<Timestamp>& commitTimestamp); - - /** - * Sends commit to all shards and returns a future that will be resolved when all participants - * have responded with success. - */ - Future<void> sendCommit(const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - Timestamp commitTimestamp); - - /** - * Sends abort to all shards and returns a future that will be resolved when all participants - * have responded with success. - */ - Future<void> sendAbort(const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber); - - /** - * Deletes the document in config.transaction_coordinators for (lsid, txnNumber). - * - * Does *not* wait for the delete to be majority-committed. - * - * Throws if the update fails. - * - * If the update succeeds but did not update any document, throws an anonymous error, because it - * means either no document for (lsid, txnNumber) exists, or a document exists but without a - * decision. - */ - Future<void> deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber); - - /** - * Reads and returns all documents in config.transaction_coordinators. - */ - static std::vector<txn::TransactionCoordinatorDocument> readAllCoordinatorDocs( - OperationContext* opCtx); - - // - // These methods are used internally and are exposed for unit-testing purposes only - // - - /** - * Sends prepare to the given shard and returns a future, which will be set with the vote. - * - * This method will retry until it receives a non-retryable response from the remote node or - * until the scheduler under which it is running is shut down. Because of this it can return - * only the following error code(s): - * - TransactionCoordinatorSteppingDown - * - ShardNotFound - */ - struct PrepareResponse { - // Shard id from which the response was received - ShardId shardId; - - // If set to none, this means the shard did not produce a vote - boost::optional<txn::PrepareVote> vote; - - // Will only be set if the vote was kCommit - boost::optional<Timestamp> prepareTimestamp; - }; - Future<PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler, - const ShardId& shardId, - const BSONObj& prepareCommandObj); - - /** - * Sends a command corresponding to a commit decision (i.e. commitTransaction or* - * abortTransaction) to the given shard and returns a future, which will be set with the result. - * - * Used for sendCommit and sendAbort. - * - * This method will retry until it receives a response from the remote node which can be - * interpreted as vote abort (e.g. NoSuchTransaction), or until the scheduler under which it is - * running is shut down. Because of this it can return only the following error code(s): - * - TransactionCoordinatorSteppingDown - */ - Future<void> sendDecisionToParticipantShard(txn::AsyncWorkScheduler& scheduler, - const ShardId& shardId, - const BSONObj& commandObj); - -private: - ServiceContext* _serviceContext; - - txn::AsyncWorkScheduler& _scheduler; -}; - -} // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index fa74d11cbf0..771815d0ab1 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -33,10 +33,10 @@ #include "mongo/db/s/transaction_coordinator_futures_util.h" -#include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/s/grid.h" #include "mongo/transport/service_entry_point.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index 7a302d5a2b4..c7361906c2c 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -34,7 +34,6 @@ #include "mongo/client/read_preference.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/grid.h" #include "mongo/s/shard_id.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/future.h" diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index e5b5e15e300..6fe5d5d45c6 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -159,8 +159,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, WriteConcernOptions::kNoTimeout}, &unusedWCResult)); - auto coordinatorDocs = - TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); + auto coordinatorDocs = txn::readAllCoordinatorDocs(opCtx); LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() << " transactions"; diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index f3b89afb50d..cc5a94126b9 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -41,7 +41,7 @@ namespace mongo { namespace { -using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse; +using PrepareResponse = txn::PrepareResponse; using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; const StatusWith<BSONObj> kNoSuchTransaction = @@ -109,21 +109,18 @@ protected: void setUp() override { TransactionCoordinatorTestBase::setUp(); _aws.emplace(getServiceContext()); - _driver.emplace(getServiceContext(), *_aws); } void tearDown() override { - _driver.reset(); TransactionCoordinatorTestBase::tearDown(); } boost::optional<txn::AsyncWorkScheduler> _aws; - boost::optional<TransactionCoordinatorDriver> _driver; }; auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnNumber) { PrepareTransaction prepareCmd; - prepareCmd.setDbName("admin"); + prepareCmd.setDbName(NamespaceString::kAdminDb); auto prepareObj = prepareCmd.toBSON( BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false << WriteConcernOptions::kWriteConcernField @@ -135,8 +132,8 @@ auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnN TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = _driver->sendDecisionToParticipantShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = txn::sendDecisionToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); @@ -147,8 +144,8 @@ TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOn TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = _driver->sendDecisionToParticipantShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = txn::sendDecisionToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); @@ -162,8 +159,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = _driver->sendDecisionToParticipantShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = txn::sendDecisionToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithRetryableError(); @@ -176,8 +173,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = _driver->sendDecisionToParticipantShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = txn::sendDecisionToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -187,8 +184,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = _driver->sendDecisionToParticipantShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = txn::sendDecisionToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); @@ -200,8 +197,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<PrepareResponse> future = _driver->sendPrepareToShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<PrepareResponse> future = txn::sendPrepareToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); @@ -214,8 +211,8 @@ TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecision TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<PrepareResponse> future = _driver->sendPrepareToShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<PrepareResponse> future = txn::sendPrepareToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); @@ -231,8 +228,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardCanBeInterruptedAndReturnsNoDecisionIfNotServiceShutdown) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<PrepareResponse> future = _driver->sendPrepareToShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<PrepareResponse> future = txn::sendPrepareToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); aws.shutdown({ErrorCodes::TransactionCoordinatorReachedAbortDecision, "Retry interrupted"}); @@ -246,8 +243,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardCanBeInterruptedAndThrowsExceptionIfServiceShutdown) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<PrepareResponse> future = _driver->sendPrepareToShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<PrepareResponse> future = txn::sendPrepareToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"}); @@ -260,8 +257,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = _driver->sendPrepareToShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + auto future = txn::sendPrepareToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -273,8 +270,8 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = _driver->sendPrepareToShard( - aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + auto future = txn::sendPrepareToShard( + getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -286,38 +283,38 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList); onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); - auto response = future.get(); - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); + auto decision = future.get().decision(); + ASSERT(decision.getDecision() == txn::CommitDecision::kAbort); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithNoSuchTransaction(); - auto response = future.get(); - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); + auto decision = future.get().decision(); + ASSERT(decision.getDecision() == txn::CommitDecision::kAbort); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList); onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); - auto response = future.get(); - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); + auto decision = future.get().decision(); + ASSERT(decision.getDecision() == txn::CommitDecision::kAbort); } TEST_F(TransactionCoordinatorDriverTest, @@ -325,14 +322,15 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); - auto response = future.get(); - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); + auto decision = future.get().decision(); + ASSERT(decision.getDecision() == txn::CommitDecision::kCommit); + ASSERT_EQ(maxPrepareTimestamp, *decision.getCommitTimestamp()); } TEST_F(TransactionCoordinatorDriverTest, @@ -340,14 +338,15 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); - auto response = future.get(); - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); + auto decision = future.get().decision(); + ASSERT(decision.getDecision() == txn::CommitDecision::kCommit); + ASSERT_EQ(maxPrepareTimestamp, *decision.getCommitTimestamp()); } TEST_F(TransactionCoordinatorDriverTest, @@ -355,19 +354,30 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); - auto response = future.get(); - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); + auto decision = future.get().decision(); + ASSERT(decision.getDecision() == txn::CommitDecision::kCommit); + ASSERT_EQ(maxPrepareTimestamp, *decision.getCommitTimestamp()); } class TransactionCoordinatorDriverPersistenceTest : public TransactionCoordinatorDriverTest { protected: + void setUp() override { + TransactionCoordinatorDriverTest::setUp(); + _aws.emplace(getServiceContext()); + } + + void tearDown() override { + _aws.reset(); + TransactionCoordinatorDriverTest::tearDown(); + } + static void assertDocumentMatches( TransactionCoordinatorDocument doc, LogicalSessionId expectedLsid, @@ -401,9 +411,9 @@ protected: LogicalSessionId lsid, TxnNumber txnNumber, const std::vector<ShardId>& participants) { - _driver->persistParticipantList(lsid, txnNumber, participants).get(); + txn::persistParticipantsList(*_aws, lsid, txnNumber, participants).get(); - auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants); } @@ -413,9 +423,9 @@ protected: TxnNumber txnNumber, const std::vector<ShardId>& participants, const boost::optional<Timestamp>& commitTimestamp) { - _driver->persistDecision(lsid, txnNumber, participants, commitTimestamp).get(); + txn::persistDecision(*_aws, lsid, txnNumber, participants, commitTimestamp).get(); - auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); if (commitTimestamp) { assertDocumentMatches(allCoordinatorDocs[0], @@ -433,9 +443,9 @@ protected: void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { - _driver->deleteCoordinatorDoc(lsid, txnNumber).get(); + txn::deleteCoordinatorDoc(*_aws, lsid, txnNumber).get(); - auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0)); } @@ -443,6 +453,8 @@ protected: ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")}; const Timestamp _commitTimestamp{Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0)}; + + boost::optional<txn::AsyncWorkScheduler> _aws; }; TEST_F(TransactionCoordinatorDriverPersistenceTest, @@ -464,21 +476,22 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")}; ASSERT_THROWS_CODE( - _driver->persistParticipantList(_lsid, _txnNumber, smallerParticipantList).get(), + txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList).get(), AssertionException, 51025); std::vector<ShardId> largerParticipantList{ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003"), ShardId("shard0004")}; ASSERT_THROWS_CODE( - _driver->persistParticipantList(_lsid, _txnNumber, largerParticipantList).get(), + txn::persistParticipantsList(*_aws, _lsid, _txnNumber, largerParticipantList).get(), AssertionException, 51025); std::vector<ShardId> differentSameSizeParticipantList{ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0004")}; ASSERT_THROWS_CODE( - _driver->persistParticipantList(_lsid, _txnNumber, differentSameSizeParticipantList).get(), + txn::persistParticipantsList(*_aws, _lsid, _txnNumber, differentSameSizeParticipantList) + .get(), AssertionException, 51025); } @@ -487,10 +500,9 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleTransactionsOnSameSession) { for (int i = 1; i <= 3; i++) { auto txnNumber = TxnNumber{i}; - _driver->persistParticipantList(_lsid, txnNumber, _participants).get(); + txn::persistParticipantsList(*_aws, _lsid, txnNumber, _participants).get(); - auto allCoordinatorDocs = - TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext()); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); } } @@ -498,10 +510,9 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleSessions) { for (int i = 1; i <= 3; i++) { auto lsid = makeLogicalSessionIdForTest(); - _driver->persistParticipantList(lsid, _txnNumber, _participants).get(); + txn::persistParticipantsList(*_aws, lsid, _txnNumber, _participants).get(); - auto allCoordinatorDocs = - TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext()); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); } } @@ -509,7 +520,8 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMul TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistAbortDecisionWhenNoDocumentForTransactionExistsFails) { ASSERT_THROWS_CODE( - _driver->persistDecision(_lsid, _txnNumber, _participants, boost::none /* abort */).get(), + txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, boost::none /* abort */) + .get(), AssertionException, 51026); } @@ -534,7 +546,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistCommitDecisionWhenNoDocumentForTransactionExistsFails) { ASSERT_THROWS_CODE( - _driver->persistDecision(_lsid, _txnNumber, _participants, _commitTimestamp /* commit */) + txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, _commitTimestamp /* commit */) .get(), AssertionException, 51026); @@ -564,9 +576,8 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, const Timestamp differentCommitTimestamp(Date_t::now().toMillisSinceEpoch() / 1000, 1); ASSERT_THROWS_CODE( - _driver - ->persistDecision( - _lsid, _txnNumber, _participants, differentCommitTimestamp /* commit */) + txn::persistDecision( + *_aws, _lsid, _txnNumber, _participants, differentCommitTimestamp /* commit */) .get(), AssertionException, 51026); @@ -579,7 +590,8 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */); ASSERT_THROWS_CODE( - _driver->persistDecision(_lsid, _txnNumber, _participants, boost::none /* abort */).get(), + txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, boost::none /* abort */) + .get(), AssertionException, 51026); } @@ -591,7 +603,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */); ASSERT_THROWS_CODE( - _driver->persistDecision(_lsid, _txnNumber, _participants, _commitTimestamp /* abort */) + txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, _commitTimestamp /* abort */) .get(), AssertionException, 51026); @@ -599,14 +611,14 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) { ASSERT_THROWS_CODE( - _driver->deleteCoordinatorDoc(_lsid, _txnNumber).get(), AssertionException, 51027); + txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027); } TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) { persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); ASSERT_THROWS_CODE( - _driver->deleteCoordinatorDoc(_lsid, _txnNumber).get(), AssertionException, 51027); + txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027); } TEST_F(TransactionCoordinatorDriverPersistenceTest, @@ -631,19 +643,18 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, const auto txnNumber2 = TxnNumber{5}; // Insert coordinator documents for two transactions. - _driver->persistParticipantList(_lsid, txnNumber1, _participants).get(); - _driver->persistParticipantList(_lsid, txnNumber2, _participants).get(); + txn::persistParticipantsList(*_aws, _lsid, txnNumber1, _participants).get(); + txn::persistParticipantsList(*_aws, _lsid, txnNumber2, _participants).get(); - auto allCoordinatorDocs = - TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext()); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2)); // Delete the document for the first transaction and check that only the second transaction's // document still exists. - _driver->persistDecision(_lsid, txnNumber1, _participants, boost::none /* abort */).get(); - _driver->deleteCoordinatorDoc(_lsid, txnNumber1).get(); + txn::persistDecision(*_aws, _lsid, txnNumber1, _participants, boost::none /* abort */).get(); + txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumber1).get(); - allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext()); + allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumber2, _participants); } diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index c54de6058c1..e116d3a8a02 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/s/transaction_coordinator_driver.h" +#include "mongo/db/s/transaction_coordinator_util.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/commands/txn_cmds_gen.h" @@ -39,14 +39,13 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/transaction_coordinator_document_gen.h" -#include "mongo/db/s/transaction_coordinator_futures_util.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { +namespace txn { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern); @@ -56,16 +55,8 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList); MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision); MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc); -using CommitDecision = txn::CommitDecision; -using PrepareVote = txn::PrepareVote; -using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; - -using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using ResponseStatus = executor::TaskExecutor::ResponseStatus; -using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse; -using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus; - const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern( WriteConcernOptions::kInternalMajorityNoSnapshot, WriteConcernOptions::SyncMode::UNSET, @@ -75,14 +66,6 @@ const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); const ReadPreferenceSetting kPrimaryReadPreference{ReadPreference::PrimaryOnly}; -bool isRetryableError(ErrorCodes::Error code) { - // TODO (SERVER-37880): Consider using RemoteCommandRetryScheduler. - return std::find(RemoteCommandRetryScheduler::kAllRetriableErrors.begin(), - RemoteCommandRetryScheduler::kAllRetriableErrors.end(), - code) != RemoteCommandRetryScheduler::kAllRetriableErrors.end() || - code == ErrorCodes::NetworkInterfaceExceededTimeLimit; -} - BSONArray buildParticipantListMatchesConditions(const std::vector<ShardId>& participantList) { BSONArrayBuilder barr; for (const auto& participant : participantList) { @@ -120,10 +103,6 @@ bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) { } // namespace -TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* serviceContext, - txn::AsyncWorkScheduler& scheduler) - : _serviceContext(serviceContext), _scheduler(scheduler) {} - namespace { void persistParticipantListBlocking(OperationContext* opCtx, const LogicalSessionId& lsid, @@ -213,27 +192,53 @@ void persistParticipantListBlocking(OperationContext* opCtx, } } // namespace -Future<void> TransactionCoordinatorDriver::persistParticipantList( - const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) { - return txn::doWhile(_scheduler, - boost::none /* no need for a backoff */, - [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, - [this, lsid, txnNumber, participantList] { - return _scheduler.scheduleWork( - [lsid, txnNumber, participantList](OperationContext* opCtx) { - persistParticipantListBlocking( - opCtx, lsid, txnNumber, participantList); - }); - }); +Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants) { + return txn::doWhile( + scheduler, + boost::none /* no need for a backoff */, + [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, + [&scheduler, lsid, txnNumber, participants] { + return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) { + persistParticipantListBlocking(opCtx, lsid, txnNumber, participants); + }); + }); } -Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( - const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber) { - PrepareTransaction prepareCmd; - prepareCmd.setDbName("admin"); - auto prepareObj = prepareCmd.toBSON( +void PrepareVoteConsensus::registerVote(const PrepareResponse& vote) { + if (vote.vote == PrepareVote::kCommit) { + ++_numCommitVotes; + _maxPrepareTimestamp = std::max(_maxPrepareTimestamp, *vote.prepareTimestamp); + } else if (vote.vote == PrepareVote::kAbort) { + ++_numAbortVotes; + } else { + ++_numNoVotes; + } +} + +CoordinatorCommitDecision PrepareVoteConsensus::decision() const { + invariant(_numShards == _numCommitVotes + _numAbortVotes + _numNoVotes); + + CoordinatorCommitDecision decision; + if (_numCommitVotes == _numShards) { + decision.setDecision(CommitDecision::kCommit); + decision.setCommitTimestamp(_maxPrepareTimestamp); + } else { + decision.setDecision(CommitDecision::kAbort); + } + return decision; +} + +Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants) { + PrepareTransaction prepareTransaction; + prepareTransaction.setDbName(NamespaceString::kAdminDb); + auto prepareObj = prepareTransaction.toBSON( BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::InternalMajorityNoSnapshot)); @@ -242,10 +247,11 @@ Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( // Send prepare to all participants asynchronously and collect their future responses in a // vector of responses. - auto prepareScheduler = _scheduler.makeChildScheduler(); + auto prepareScheduler = scheduler.makeChildScheduler(); - for (const auto& participant : participantShards) { - responses.push_back(sendPrepareToShard(*prepareScheduler, participant, prepareObj)); + for (const auto& participant : participants) { + responses.emplace_back( + sendPrepareToShard(service, *prepareScheduler, participant, prepareObj)); } // Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp @@ -254,37 +260,19 @@ Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( return txn::collect( std::move(responses), // Initial value - PrepareVoteConsensus{boost::none, boost::none}, + PrepareVoteConsensus{int(participants.size())}, // Aggregates an incoming response (next) with the existing aggregate value (result) [prepareScheduler = std::move(prepareScheduler)](PrepareVoteConsensus & result, const PrepareResponse& next) { - if (!next.vote) { - LOG(3) << "Transaction coordinator did not receive a response from shard " - << next.shardId; - return txn::ShouldStopIteration::kNo; - } + result.registerVote(next); - switch (*next.vote) { - case PrepareVote::kAbort: - result.decision = CommitDecision::kAbort; - result.maxPrepareTimestamp = boost::none; - prepareScheduler->shutdown( - {ErrorCodes::TransactionCoordinatorReachedAbortDecision, - "Received at least one vote abort"}); - return txn::ShouldStopIteration::kYes; - case PrepareVote::kCommit: - result.decision = CommitDecision::kCommit; - result.maxPrepareTimestamp = (result.maxPrepareTimestamp) - ? std::max(result.maxPrepareTimestamp, next.prepareTimestamp) - : next.prepareTimestamp; - return txn::ShouldStopIteration::kNo; - case PrepareVote::kCanceled: - // PrepareVote is just an alias for CommitDecision, so we need to include this - // branch as part of the switch statement, but CommitDecision::kCanceled will - // never be a valid response to prepare so this path is unreachable. - MONGO_UNREACHABLE; + if (next.vote == PrepareVote::kAbort) { + prepareScheduler->shutdown( + {ErrorCodes::TransactionCoordinatorReachedAbortDecision, + str::stream() << "Received abort vote from " << next.shardId}); } - MONGO_UNREACHABLE; + + return txn::ShouldStopIteration::kNo; }); } @@ -400,55 +388,59 @@ void persistDecisionBlocking(OperationContext* opCtx, } } // namespace -Future<void> TransactionCoordinatorDriver::persistDecision( - const LogicalSessionId& lsid, - TxnNumber txnNumber, - std::vector<ShardId> participantList, - const boost::optional<Timestamp>& commitTimestamp) { +Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants, + const boost::optional<Timestamp>& commitTimestamp) { return txn::doWhile( - _scheduler, + scheduler, boost::none /* no need for a backoff */, [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, - [this, lsid, txnNumber, participantList, commitTimestamp] { - return _scheduler.scheduleWork([lsid, txnNumber, participantList, commitTimestamp]( - OperationContext* opCtx) { - persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp); - }); + [&scheduler, lsid, txnNumber, participants, commitTimestamp] { + return scheduler.scheduleWork( + [lsid, txnNumber, participants, commitTimestamp](OperationContext* opCtx) { + persistDecisionBlocking(opCtx, lsid, txnNumber, participants, commitTimestamp); + }); }); } -Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - Timestamp commitTimestamp) { +Future<void> sendCommit(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants, + Timestamp commitTimestamp) { CommitTransaction commitTransaction; + commitTransaction.setDbName(NamespaceString::kAdminDb); commitTransaction.setCommitTimestamp(commitTimestamp); - commitTransaction.setDbName("admin"); - BSONObj commitObj = commitTransaction.toBSON( + auto commitObj = commitTransaction.toBSON( BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); std::vector<Future<void>> responses; - for (const auto& participant : participantShards) { - responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, commitObj)); + for (const auto& participant : participants) { + responses.push_back(sendDecisionToShard(service, scheduler, participant, commitObj)); } return txn::whenAll(responses); } -Future<void> TransactionCoordinatorDriver::sendAbort(const std::vector<ShardId>& participantShards, - const LogicalSessionId& lsid, - TxnNumber txnNumber) { - BSONObj abortObj = - BSON("abortTransaction" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << txnNumber - << "autocommit" - << false - << WriteConcernOptions::kWriteConcernField - << WriteConcernOptions::Majority); +Future<void> sendAbort(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants) { + AbortTransaction abortTransaction; + abortTransaction.setDbName(NamespaceString::kAdminDb); + auto abortObj = abortTransaction.toBSON( + BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false + << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions::Majority)); std::vector<Future<void>> responses; - for (const auto& participant : participantShards) { - responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, abortObj)); + for (const auto& participant : participants) { + responses.push_back(sendDecisionToShard(service, scheduler, participant, abortObj)); } return txn::whenAll(responses); } @@ -516,21 +508,21 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, } } // namespace -Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid, - TxnNumber txnNumber) { - return txn::doWhile(_scheduler, +Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { + return txn::doWhile(scheduler, boost::none /* no need for a backoff */, [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); }, - [this, lsid, txnNumber] { - return _scheduler.scheduleWork( + [&scheduler, lsid, txnNumber] { + return scheduler.scheduleWork( [lsid, txnNumber](OperationContext* opCtx) { deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); }); }); } -std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAllCoordinatorDocs( - OperationContext* opCtx) { +std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) { std::vector<TransactionCoordinatorDocument> allCoordinatorDocs; DBDirectClient client(opCtx); @@ -548,15 +540,16 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl return allCoordinatorDocs; } -Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( - txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) { - const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext)); +Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, + const BSONObj& commandObj) { + const bool isLocalShard = (shardId == txn::getLocalShardId(service)); auto f = txn::doWhile( scheduler, kExponentialBackoff, - [ shardId, isLocalShard, commandObj = commandObj.getOwned() ]( - StatusWith<PrepareResponse> swPrepareResponse) { + [](const StatusWith<PrepareResponse>& swPrepareResponse) { // *Always* retry until hearing a conclusive response or being told to stop via a // coordinator-specific code. return !swPrepareResponse.isOK() && @@ -629,21 +622,23 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard( }); } -Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard( - txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) { - const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext)); +Future<void> sendDecisionToShard(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, + const BSONObj& commandObj) { + const bool isLocalShard = (shardId == txn::getLocalShardId(service)); return txn::doWhile( scheduler, kExponentialBackoff, - [ shardId, isLocalShard, commandObj = commandObj.getOwned() ](const Status& s) { + [](const Status& s) { // *Always* retry until hearing a conclusive response or being told to stop via a // coordinator-specific code. return !s.isOK() && s != ErrorCodes::TransactionCoordinatorSteppingDown; }, [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] { LOG(3) << "Coordinator going to send command " << commandObj << " to " - << (isLocalShard ? " local " : "") << " shard " << shardId; + << (isLocalShard ? "local" : "") << " shard " << shardId; return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { @@ -676,4 +671,5 @@ Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard( }); } +} // namespace txn } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h new file mode 100644 index 00000000000..583350a6d0e --- /dev/null +++ b/src/mongo/db/s/transaction_coordinator_util.h @@ -0,0 +1,215 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/s/transaction_coordinator_document_gen.h" +#include "mongo/db/s/transaction_coordinator_futures_util.h" + +namespace mongo { +namespace txn { + +/** + * Upserts a document of the form: + * + * { + * _id: {lsid: <lsid>, txnNumber: <txnNumber>} + * participants: ["shard0000", "shard0001"] + * } + * + * into config.transaction_coordinators and waits for the upsert to be majority-committed. + * + * Throws if the upsert fails or waiting for writeConcern fails. + * + * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means a + * document for the (lsid, txnNumber) exists with a different participant list. + */ +Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants); + +/** + * Sends prepare to all participants and returns a future that will be resolved when either: + * a) All participants have responded with a vote to commit, or + * b) At least one participant votes to abort. + */ +struct PrepareResponse; +class PrepareVoteConsensus { +public: + PrepareVoteConsensus(int numShards) : _numShards(numShards) {} + + void registerVote(const PrepareResponse& vote); + + /** + * May only be called when all of `numShards` have called `registerVote` above. Contains the + * commit decision of the vote, which would only be kCommit if all shards have responded with a + * 'commit'. + */ + CoordinatorCommitDecision decision() const; + +private: + int _numShards; + + int _numCommitVotes{0}; + int _numAbortVotes{0}; + int _numNoVotes{0}; + + Timestamp _maxPrepareTimestamp; +}; +Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants); + +/** + * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators + * for + * + * (lsid, txnNumber) to be: + * + * { + * _id: {lsid: <lsid>, txnNumber: <txnNumber>} + * participants: ["shard0000", "shard0001"] + * decision: "abort" + * } + * + * else updates the document to be: + * + * { + * _id: {lsid: <lsid>, txnNumber: <txnNumber>} + * participants: ["shard0000", "shard0001"] + * decision: "commit" + * commitTimestamp: Timestamp(xxxxxxxx, x), + * } + * + * and waits for the update to be majority-committed. + * + * Throws if the update fails or waiting for writeConcern fails. + * + * If the update succeeds but did not update any document, throws an anonymous error, because it + * means either no document for (lsid, txnNumber) exists, or a document exists but has a different + * participant list, different decision, or different commit Timestamp. + */ +Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants, + const boost::optional<Timestamp>& commitTimestamp); + +/** + * Sends commit to all shards and returns a future that will be resolved when all participants have + * responded with success. + */ +Future<void> sendCommit(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants, + Timestamp commitTimestamp); + +/** + * Sends abort to all shards and returns a future that will be resolved when all participants have + * responded with success. + */ +Future<void> sendAbort(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const txn::ParticipantsList& participants); + +/** + * Deletes the document in config.transaction_coordinators for (lsid, txnNumber). + * + * Does *not* wait for the delete to be majority-committed. + * + * Throws if the update fails. + * + * If the delete succeeds but did not delete any document, throws an anonymous error, because it + * means either no document for (lsid, txnNumber) exists, or a document exists but without a + * decision. + */ +Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber); + +/** + * Reads and returns all documents in config.transaction_coordinators. + */ +std::vector<txn::TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx); + +// +// These methods are used internally and are exposed for unit-testing purposes only +// + +/** + * Sends prepare to the given shard and returns a future, which will be set with the vote. + * + * This method will retry until it receives a non-retryable response from the remote node or until + * the scheduler under which it is running is shut down. Because of this it can return only the + * following error code(s): + * - TransactionCoordinatorSteppingDown + * - ShardNotFound + */ +struct PrepareResponse { + // Shard id from which the response was received + ShardId shardId; + + // If set to none, this means the shard did not produce a vote + boost::optional<txn::PrepareVote> vote; + + // Will only be set if the vote was kCommit + boost::optional<Timestamp> prepareTimestamp; +}; +Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, + const BSONObj& prepareCommandObj); + +/** + * Sends a command corresponding to a commit decision (i.e. commitTransaction or* + * abortTransaction) to the given shard and returns a future, which will be set with the result. + * + * Used for sendCommit and sendAbort. + * + * This method will retry until it receives a response from the remote node which can be + * interpreted as vote abort (e.g. NoSuchTransaction), or until the scheduler under which it is + * running is shut down. Because of this it can return only the following error code(s): + * - TransactionCoordinatorSteppingDown + */ +Future<void> sendDecisionToShard(ServiceContext* service, + txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, + const BSONObj& commandObj); + +} // namespace txn +} // namespace mongo |