summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-27 18:22:26 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-04-01 07:26:26 -0400
commit907e4760487e081db031232d1f8326c0f8bdef68 (patch)
treeb27bdde6a3294072b5ff15ce07081e51696b2909 /src/mongo
parent9dd07662bbe80415d90131fca2be6312166d6f39 (diff)
downloadmongo-907e4760487e081db031232d1f8326c0f8bdef68.tar.gz
SERVER-40297 Make all TransactionCoordinatorDriver methods free functions
The TransactionCoordinatorDriver is not really a "driver", but just a set of functions to perform asynchronous work. There isn't any state to keep, so there is no need for them to be in a class.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/txn_cmds.idl5
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp48
-rw-r--r--src/mongo/db/s/transaction_coordinator.h6
-rw-r--r--src/mongo/db/s/transaction_coordinator_driver.h218
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.cpp2
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h1
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp3
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp175
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp (renamed from src/mongo/db/s/transaction_coordinator_driver.cpp)232
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.h215
11 files changed, 454 insertions, 453 deletions
diff --git a/src/mongo/db/commands/txn_cmds.idl b/src/mongo/db/commands/txn_cmds.idl
index e5f9e6c236d..76616b84c9c 100644
--- a/src/mongo/db/commands/txn_cmds.idl
+++ b/src/mongo/db/commands/txn_cmds.idl
@@ -28,7 +28,6 @@
global:
cpp_namespace: "mongo"
-
imports:
- "mongo/idl/basic_types.idl"
- "mongo/s/sharding_types.idl"
@@ -100,3 +99,7 @@ commands:
progress on commit by processing using the info in the recoveryToken."
optional: true
type: TxnRecoveryToken
+
+ abortTransaction:
+ description: "abortTransaction Command"
+ namespace: ignored
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d8edd672d09..0fc1b35ee08 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -122,11 +122,11 @@ env.Library(
target='transaction_coordinator',
source=[
'transaction_coordinator_catalog.cpp',
- 'transaction_coordinator_driver.cpp',
'transaction_coordinator_factory_mongod.cpp',
'transaction_coordinator_futures_util.cpp',
'transaction_coordinator_service.cpp',
'transaction_coordinator_structures.cpp',
+ 'transaction_coordinator_util.cpp',
'transaction_coordinator.cpp',
env.Idlc('transaction_coordinator_document.idl')[0],
],
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 7689ddb2108..65b1231fe66 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -34,8 +34,6 @@
#include "mongo/db/s/transaction_coordinator.h"
#include "mongo/db/logical_clock.h"
-#include "mongo/db/s/transaction_coordinator_document_gen.h"
-#include "mongo/db/s/transaction_coordinator_futures_util.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -43,28 +41,24 @@ namespace {
using CommitDecision = txn::CommitDecision;
using CoordinatorCommitDecision = txn::CoordinatorCommitDecision;
+using PrepareVoteConsensus = txn::PrepareVoteConsensus;
using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
-using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus;
-
CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(ServiceContext* service,
const PrepareVoteConsensus& result,
const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- invariant(result.decision);
- CoordinatorCommitDecision decision(*result.decision);
-
- if (result.decision == CommitDecision::kCommit) {
- invariant(result.maxPrepareTimestamp);
-
- decision.setCommitTimestamp(Timestamp(result.maxPrepareTimestamp->getSecs(),
- result.maxPrepareTimestamp->getInc() + 1));
+ auto decision = result.decision();
+ if (decision.getDecision() == CommitDecision::kCommit) {
LOG(3) << "Advancing cluster time to the commit timestamp "
<< *decision.getCommitTimestamp() << " for " << lsid.getId() << ':' << txnNumber;
uassertStatusOK(LogicalClock::get(service)->advanceClusterTime(
- LogicalTime(*result.maxPrepareTimestamp)));
+ LogicalTime(*decision.getCommitTimestamp())));
+
+ decision.setCommitTimestamp(Timestamp(decision.getCommitTimestamp()->getSecs(),
+ decision.getCommitTimestamp()->getInc() + 1));
}
return decision;
@@ -80,8 +74,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
: _serviceContext(serviceContext),
_lsid(lsid),
_txnNumber(txnNumber),
- _scheduler(std::move(scheduler)),
- _driver(serviceContext, *_scheduler) {
+ _scheduler(std::move(scheduler)) {
if (coordinateCommitDeadline) {
_deadlineScheduler = _scheduler->makeChildScheduler();
_deadlineScheduler
@@ -119,7 +112,7 @@ void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) {
_cancelTimeoutWaitForCommitTask();
- _driver.persistParticipantList(_lsid, _txnNumber, participantShards)
+ txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, participantShards)
.then([this, participantShards]() { return _runPhaseOne(participantShards); })
.then([this, participantShards](CoordinatorCommitDecision decision) {
return _runPhaseTwo(participantShards, decision);
@@ -194,16 +187,18 @@ void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() {
Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
const std::vector<ShardId>& participantShards) {
- return _driver.sendPrepare(participantShards, _lsid, _txnNumber)
+ return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, participantShards)
.then([this, participantShards](PrepareVoteConsensus result) {
invariant(_state == CoordinatorState::kPreparing);
auto decision =
makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber);
- return _driver
- .persistDecision(
- _lsid, _txnNumber, participantShards, decision.getCommitTimestamp())
+ return txn::persistDecision(*_scheduler,
+ _lsid,
+ _txnNumber,
+ participantShards,
+ decision.getCommitTimestamp())
.then([decision] { return decision; });
});
}
@@ -215,7 +210,7 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa
if (MONGO_FAIL_POINT(doNotForgetCoordinator))
return Future<void>::makeReady();
- return _driver.deleteCoordinatorDoc(_lsid, _txnNumber);
+ return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber);
})
.then([this] {
LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber;
@@ -233,11 +228,16 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
switch (decision.getDecision()) {
case CommitDecision::kCommit:
_state = CoordinatorState::kCommitting;
- return _driver.sendCommit(
- participantShards, _lsid, _txnNumber, *decision.getCommitTimestamp());
+ return txn::sendCommit(_serviceContext,
+ *_scheduler,
+ _lsid,
+ _txnNumber,
+ participantShards,
+ *decision.getCommitTimestamp());
case CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
- return _driver.sendAbort(participantShards, _lsid, _txnNumber);
+ return txn::sendAbort(
+ _serviceContext, *_scheduler, _lsid, _txnNumber, participantShards);
case CommitDecision::kCanceled:
MONGO_UNREACHABLE;
};
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index 1e3ec5d820b..4e7a0a80351 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -31,7 +31,7 @@
#include <vector>
-#include "mongo/db/s/transaction_coordinator_driver.h"
+#include "mongo/db/s/transaction_coordinator_util.h"
#include "mongo/logger/logstream_builder.h"
#include "mongo/util/fail_point_service.h"
@@ -187,10 +187,6 @@ private:
// coordinator was not created with an expiration task.
std::unique_ptr<txn::AsyncWorkScheduler> _deadlineScheduler;
- // Context object used to perform and track the state of asynchronous operations on behalf of
- // this coordinator.
- TransactionCoordinatorDriver _driver;
-
// Protects the state below
mutable stdx::mutex _mutex;
diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h
deleted file mode 100644
index 139b14b9ea8..00000000000
--- a/src/mongo/db/s/transaction_coordinator_driver.h
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <vector>
-
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/s/transaction_coordinator_document_gen.h"
-#include "mongo/db/s/transaction_coordinator_futures_util.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/future.h"
-
-namespace mongo {
-
-/**
- * Single instance of this class is owned by each TransactionCoordinator. It abstracts the
- * long-running blocking operations into a futurized interface and ensures that all outstanding
- * operations are joined at completion.
- *
- * Methods on this class must be called from a single thread at a time and other than cancellation,
- * no other operations are allowed until any returned futures are signalled.
- */
-class TransactionCoordinatorDriver {
- TransactionCoordinatorDriver(const TransactionCoordinatorDriver&) = delete;
- TransactionCoordinatorDriver& operator=(const TransactionCoordinatorDriver&) = delete;
-
-public:
- TransactionCoordinatorDriver(ServiceContext* serviceContext,
- txn::AsyncWorkScheduler& scheduler);
-
- /**
- * Upserts a document of the form:
- *
- * {
- * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
- * participants: ["shard0000", "shard0001"]
- * }
- *
- * into config.transaction_coordinators and waits for the upsert to be majority-committed.
- *
- * Throws if the upsert fails or waiting for writeConcern fails.
- *
- * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it
- * means a document for the (lsid, txnNumber) exists with a different participant list.
- */
- Future<void> persistParticipantList(const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- std::vector<ShardId> participantList);
-
- /**
- * Sends prepare to all participants and returns a future that will be resolved when either:
- * a) All participants have responded with a vote to commit, or
- * b) At least one participant votes to abort.
- *
- * If all participants vote to commit, the result will contain the max prepare timestamp of all
- * prepare timestamps attached to the participants' responses. Otherwise the result will simply
- * contain the decision to abort.
- */
- struct PrepareVoteConsensus {
- // Optional decision, if any was reached (decision could be empty if no response was
- // received)
- boost::optional<txn::CommitDecision> decision;
-
- // Should only be consulted if the decision is commit and contains the maximum prepare
- // timestamp across all participants
- boost::optional<Timestamp> maxPrepareTimestamp;
- };
- Future<PrepareVoteConsensus> sendPrepare(const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber);
-
- /**
- * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators
- * for
- *
- * (lsid, txnNumber) to be:
- *
- * {
- * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
- * participants: ["shard0000", "shard0001"]
- * decision: "abort"
- * }
- *
- * else updates the document to be:
- *
- * {
- * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
- * participants: ["shard0000", "shard0001"]
- * decision: "commit"
- * commitTimestamp: Timestamp(xxxxxxxx, x),
- * }
- *
- * and waits for the update to be majority-committed.
- *
- * Throws if the update fails or waiting for writeConcern fails.
- *
- * If the update succeeds but did not update any document, throws an anonymous error, because it
- * means either no document for (lsid, txnNumber) exists, or a document exists but has a
- * different participant list, different decision, or different commit Timestamp.
- */
- Future<void> persistDecision(const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- std::vector<ShardId> participantList,
- const boost::optional<Timestamp>& commitTimestamp);
-
- /**
- * Sends commit to all shards and returns a future that will be resolved when all participants
- * have responded with success.
- */
- Future<void> sendCommit(const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- Timestamp commitTimestamp);
-
- /**
- * Sends abort to all shards and returns a future that will be resolved when all participants
- * have responded with success.
- */
- Future<void> sendAbort(const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber);
-
- /**
- * Deletes the document in config.transaction_coordinators for (lsid, txnNumber).
- *
- * Does *not* wait for the delete to be majority-committed.
- *
- * Throws if the update fails.
- *
- * If the update succeeds but did not update any document, throws an anonymous error, because it
- * means either no document for (lsid, txnNumber) exists, or a document exists but without a
- * decision.
- */
- Future<void> deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber);
-
- /**
- * Reads and returns all documents in config.transaction_coordinators.
- */
- static std::vector<txn::TransactionCoordinatorDocument> readAllCoordinatorDocs(
- OperationContext* opCtx);
-
- //
- // These methods are used internally and are exposed for unit-testing purposes only
- //
-
- /**
- * Sends prepare to the given shard and returns a future, which will be set with the vote.
- *
- * This method will retry until it receives a non-retryable response from the remote node or
- * until the scheduler under which it is running is shut down. Because of this it can return
- * only the following error code(s):
- * - TransactionCoordinatorSteppingDown
- * - ShardNotFound
- */
- struct PrepareResponse {
- // Shard id from which the response was received
- ShardId shardId;
-
- // If set to none, this means the shard did not produce a vote
- boost::optional<txn::PrepareVote> vote;
-
- // Will only be set if the vote was kCommit
- boost::optional<Timestamp> prepareTimestamp;
- };
- Future<PrepareResponse> sendPrepareToShard(txn::AsyncWorkScheduler& scheduler,
- const ShardId& shardId,
- const BSONObj& prepareCommandObj);
-
- /**
- * Sends a command corresponding to a commit decision (i.e. commitTransaction or*
- * abortTransaction) to the given shard and returns a future, which will be set with the result.
- *
- * Used for sendCommit and sendAbort.
- *
- * This method will retry until it receives a response from the remote node which can be
- * interpreted as vote abort (e.g. NoSuchTransaction), or until the scheduler under which it is
- * running is shut down. Because of this it can return only the following error code(s):
- * - TransactionCoordinatorSteppingDown
- */
- Future<void> sendDecisionToParticipantShard(txn::AsyncWorkScheduler& scheduler,
- const ShardId& shardId,
- const BSONObj& commandObj);
-
-private:
- ServiceContext* _serviceContext;
-
- txn::AsyncWorkScheduler& _scheduler;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
index fa74d11cbf0..771815d0ab1 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
@@ -33,10 +33,10 @@
#include "mongo/db/s/transaction_coordinator_futures_util.h"
-#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/grid.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index 7a302d5a2b4..c7361906c2c 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -34,7 +34,6 @@
#include "mongo/client/read_preference.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/grid.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/future.h"
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index e5b5e15e300..6fe5d5d45c6 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -159,8 +159,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
WriteConcernOptions::kNoTimeout},
&unusedWCResult));
- auto coordinatorDocs =
- TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ auto coordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
<< " transactions";
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index f3b89afb50d..cc5a94126b9 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -41,7 +41,7 @@
namespace mongo {
namespace {
-using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse;
+using PrepareResponse = txn::PrepareResponse;
using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
const StatusWith<BSONObj> kNoSuchTransaction =
@@ -109,21 +109,18 @@ protected:
void setUp() override {
TransactionCoordinatorTestBase::setUp();
_aws.emplace(getServiceContext());
- _driver.emplace(getServiceContext(), *_aws);
}
void tearDown() override {
- _driver.reset();
TransactionCoordinatorTestBase::tearDown();
}
boost::optional<txn::AsyncWorkScheduler> _aws;
- boost::optional<TransactionCoordinatorDriver> _driver;
};
auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnNumber) {
PrepareTransaction prepareCmd;
- prepareCmd.setDbName("admin");
+ prepareCmd.setDbName(NamespaceString::kAdminDb);
auto prepareObj = prepareCmd.toBSON(
BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
<< WriteConcernOptions::kWriteConcernField
@@ -135,8 +132,8 @@ auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnN
TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = _driver->sendDecisionToParticipantShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future = txn::sendDecisionToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
@@ -147,8 +144,8 @@ TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOn
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = _driver->sendDecisionToParticipantShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future = txn::sendDecisionToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
@@ -162,8 +159,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = _driver->sendDecisionToParticipantShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future = txn::sendDecisionToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithRetryableError();
@@ -176,8 +173,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = _driver->sendDecisionToParticipantShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future = txn::sendDecisionToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -187,8 +184,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<void> future = _driver->sendDecisionToParticipantShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<void> future = txn::sendDecisionToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"});
@@ -200,8 +197,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<PrepareResponse> future = _driver->sendPrepareToShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<PrepareResponse> future = txn::sendPrepareToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithSuccess();
@@ -214,8 +211,8 @@ TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecision
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<PrepareResponse> future = _driver->sendPrepareToShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<PrepareResponse> future = txn::sendPrepareToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
ASSERT(!future.isReady());
assertPrepareSentAndRespondWithRetryableError();
@@ -231,8 +228,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardCanBeInterruptedAndReturnsNoDecisionIfNotServiceShutdown) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<PrepareResponse> future = _driver->sendPrepareToShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<PrepareResponse> future = txn::sendPrepareToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
aws.shutdown({ErrorCodes::TransactionCoordinatorReachedAbortDecision, "Retry interrupted"});
@@ -246,8 +243,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardCanBeInterruptedAndThrowsExceptionIfServiceShutdown) {
txn::AsyncWorkScheduler aws(getServiceContext());
- Future<PrepareResponse> future = _driver->sendPrepareToShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ Future<PrepareResponse> future = txn::sendPrepareToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"});
@@ -260,8 +257,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = _driver->sendPrepareToShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ auto future = txn::sendPrepareToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -273,8 +270,8 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) {
txn::AsyncWorkScheduler aws(getServiceContext());
- auto future = _driver->sendPrepareToShard(
- aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
+ auto future = txn::sendPrepareToShard(
+ getServiceContext(), aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber));
assertPrepareSentAndRespondWithRetryableError();
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -286,38 +283,38 @@ TEST_F(TransactionCoordinatorDriverTest,
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList);
onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
- auto response = future.get();
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
+ auto decision = future.get().decision();
+ ASSERT(decision.getDecision() == txn::CommitDecision::kAbort);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess();
assertPrepareSentAndRespondWithNoSuchTransaction();
- auto response = future.get();
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
+ auto decision = future.get().decision();
+ ASSERT(decision.getDecision() == txn::CommitDecision::kAbort);
}
TEST_F(TransactionCoordinatorDriverTest,
SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) {
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList);
onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; },
[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }});
- auto response = future.get();
- ASSERT(response.decision == txn::CommitDecision::kAbort);
- ASSERT(response.maxPrepareTimestamp == boost::none);
+ auto decision = future.get().decision();
+ ASSERT(decision.getDecision() == txn::CommitDecision::kAbort);
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -325,14 +322,15 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
- auto response = future.get();
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
+ auto decision = future.get().decision();
+ ASSERT(decision.getDecision() == txn::CommitDecision::kCommit);
+ ASSERT_EQ(maxPrepareTimestamp, *decision.getCommitTimestamp());
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -340,14 +338,15 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
- auto response = future.get();
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
+ auto decision = future.get().decision();
+ ASSERT(decision.getDecision() == txn::CommitDecision::kCommit);
+ ASSERT_EQ(maxPrepareTimestamp, *decision.getCommitTimestamp());
}
TEST_F(TransactionCoordinatorDriverTest,
@@ -355,19 +354,30 @@ TEST_F(TransactionCoordinatorDriverTest,
const auto firstPrepareTimestamp = Timestamp(1, 1);
const auto maxPrepareTimestamp = Timestamp(2, 1);
- auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber);
+ txn::AsyncWorkScheduler aws(getServiceContext());
+ auto future = txn::sendPrepare(getServiceContext(), aws, _lsid, _txnNumber, kTwoShardIdList);
assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp);
assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp);
- auto response = future.get();
- ASSERT(response.decision == txn::CommitDecision::kCommit);
- ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp);
+ auto decision = future.get().decision();
+ ASSERT(decision.getDecision() == txn::CommitDecision::kCommit);
+ ASSERT_EQ(maxPrepareTimestamp, *decision.getCommitTimestamp());
}
class TransactionCoordinatorDriverPersistenceTest : public TransactionCoordinatorDriverTest {
protected:
+ void setUp() override {
+ TransactionCoordinatorDriverTest::setUp();
+ _aws.emplace(getServiceContext());
+ }
+
+ void tearDown() override {
+ _aws.reset();
+ TransactionCoordinatorDriverTest::tearDown();
+ }
+
static void assertDocumentMatches(
TransactionCoordinatorDocument doc,
LogicalSessionId expectedLsid,
@@ -401,9 +411,9 @@ protected:
LogicalSessionId lsid,
TxnNumber txnNumber,
const std::vector<ShardId>& participants) {
- _driver->persistParticipantList(lsid, txnNumber, participants).get();
+ txn::persistParticipantsList(*_aws, lsid, txnNumber, participants).get();
- auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants);
}
@@ -413,9 +423,9 @@ protected:
TxnNumber txnNumber,
const std::vector<ShardId>& participants,
const boost::optional<Timestamp>& commitTimestamp) {
- _driver->persistDecision(lsid, txnNumber, participants, commitTimestamp).get();
+ txn::persistDecision(*_aws, lsid, txnNumber, participants, commitTimestamp).get();
- auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
if (commitTimestamp) {
assertDocumentMatches(allCoordinatorDocs[0],
@@ -433,9 +443,9 @@ protected:
void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber) {
- _driver->deleteCoordinatorDoc(lsid, txnNumber).get();
+ txn::deleteCoordinatorDoc(*_aws, lsid, txnNumber).get();
- auto allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx);
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0));
}
@@ -443,6 +453,8 @@ protected:
ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")};
const Timestamp _commitTimestamp{Timestamp(Date_t::now().toMillisSinceEpoch() / 1000, 0)};
+
+ boost::optional<txn::AsyncWorkScheduler> _aws;
};
TEST_F(TransactionCoordinatorDriverPersistenceTest,
@@ -464,21 +476,22 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")};
ASSERT_THROWS_CODE(
- _driver->persistParticipantList(_lsid, _txnNumber, smallerParticipantList).get(),
+ txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList).get(),
AssertionException,
51025);
std::vector<ShardId> largerParticipantList{
ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003"), ShardId("shard0004")};
ASSERT_THROWS_CODE(
- _driver->persistParticipantList(_lsid, _txnNumber, largerParticipantList).get(),
+ txn::persistParticipantsList(*_aws, _lsid, _txnNumber, largerParticipantList).get(),
AssertionException,
51025);
std::vector<ShardId> differentSameSizeParticipantList{
ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0004")};
ASSERT_THROWS_CODE(
- _driver->persistParticipantList(_lsid, _txnNumber, differentSameSizeParticipantList).get(),
+ txn::persistParticipantsList(*_aws, _lsid, _txnNumber, differentSameSizeParticipantList)
+ .get(),
AssertionException,
51025);
}
@@ -487,10 +500,9 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistParticipantListForMultipleTransactionsOnSameSession) {
for (int i = 1; i <= 3; i++) {
auto txnNumber = TxnNumber{i};
- _driver->persistParticipantList(_lsid, txnNumber, _participants).get();
+ txn::persistParticipantsList(*_aws, _lsid, txnNumber, _participants).get();
- auto allCoordinatorDocs =
- TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
}
}
@@ -498,10 +510,9 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleSessions) {
for (int i = 1; i <= 3; i++) {
auto lsid = makeLogicalSessionIdForTest();
- _driver->persistParticipantList(lsid, _txnNumber, _participants).get();
+ txn::persistParticipantsList(*_aws, lsid, _txnNumber, _participants).get();
- auto allCoordinatorDocs =
- TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i));
}
}
@@ -509,7 +520,8 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMul
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistAbortDecisionWhenNoDocumentForTransactionExistsFails) {
ASSERT_THROWS_CODE(
- _driver->persistDecision(_lsid, _txnNumber, _participants, boost::none /* abort */).get(),
+ txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, boost::none /* abort */)
+ .get(),
AssertionException,
51026);
}
@@ -534,7 +546,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
TEST_F(TransactionCoordinatorDriverPersistenceTest,
PersistCommitDecisionWhenNoDocumentForTransactionExistsFails) {
ASSERT_THROWS_CODE(
- _driver->persistDecision(_lsid, _txnNumber, _participants, _commitTimestamp /* commit */)
+ txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, _commitTimestamp /* commit */)
.get(),
AssertionException,
51026);
@@ -564,9 +576,8 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
const Timestamp differentCommitTimestamp(Date_t::now().toMillisSinceEpoch() / 1000, 1);
ASSERT_THROWS_CODE(
- _driver
- ->persistDecision(
- _lsid, _txnNumber, _participants, differentCommitTimestamp /* commit */)
+ txn::persistDecision(
+ *_aws, _lsid, _txnNumber, _participants, differentCommitTimestamp /* commit */)
.get(),
AssertionException,
51026);
@@ -579,7 +590,8 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */);
ASSERT_THROWS_CODE(
- _driver->persistDecision(_lsid, _txnNumber, _participants, boost::none /* abort */).get(),
+ txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, boost::none /* abort */)
+ .get(),
AssertionException,
51026);
}
@@ -591,7 +603,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */);
ASSERT_THROWS_CODE(
- _driver->persistDecision(_lsid, _txnNumber, _participants, _commitTimestamp /* abort */)
+ txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, _commitTimestamp /* abort */)
.get(),
AssertionException,
51026);
@@ -599,14 +611,14 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) {
ASSERT_THROWS_CODE(
- _driver->deleteCoordinatorDoc(_lsid, _txnNumber).get(), AssertionException, 51027);
+ txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) {
persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants);
ASSERT_THROWS_CODE(
- _driver->deleteCoordinatorDoc(_lsid, _txnNumber).get(), AssertionException, 51027);
+ txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027);
}
TEST_F(TransactionCoordinatorDriverPersistenceTest,
@@ -631,19 +643,18 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest,
const auto txnNumber2 = TxnNumber{5};
// Insert coordinator documents for two transactions.
- _driver->persistParticipantList(_lsid, txnNumber1, _participants).get();
- _driver->persistParticipantList(_lsid, txnNumber2, _participants).get();
+ txn::persistParticipantsList(*_aws, _lsid, txnNumber1, _participants).get();
+ txn::persistParticipantsList(*_aws, _lsid, txnNumber2, _participants).get();
- auto allCoordinatorDocs =
- TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2));
// Delete the document for the first transaction and check that only the second transaction's
// document still exists.
- _driver->persistDecision(_lsid, txnNumber1, _participants, boost::none /* abort */).get();
- _driver->deleteCoordinatorDoc(_lsid, txnNumber1).get();
+ txn::persistDecision(*_aws, _lsid, txnNumber1, _participants, boost::none /* abort */).get();
+ txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumber1).get();
- allCoordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(operationContext());
+ allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext());
ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1));
assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumber2, _participants);
}
diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index c54de6058c1..e116d3a8a02 100644
--- a/src/mongo/db/s/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -31,7 +31,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/s/transaction_coordinator_driver.h"
+#include "mongo/db/s/transaction_coordinator_util.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/commands/txn_cmds_gen.h"
@@ -39,14 +39,13 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/s/transaction_coordinator_document_gen.h"
-#include "mongo/db/s/transaction_coordinator_futures_util.h"
#include "mongo/db/write_concern.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
+namespace txn {
namespace {
MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern);
@@ -56,16 +55,8 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeWritingParticipantList);
MONGO_FAIL_POINT_DEFINE(hangBeforeWritingDecision);
MONGO_FAIL_POINT_DEFINE(hangBeforeDeletingCoordinatorDoc);
-using CommitDecision = txn::CommitDecision;
-using PrepareVote = txn::PrepareVote;
-using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
-
-using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using ResponseStatus = executor::TaskExecutor::ResponseStatus;
-using PrepareResponse = TransactionCoordinatorDriver::PrepareResponse;
-using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus;
-
const WriteConcernOptions kInternalMajorityNoSnapshotWriteConcern(
WriteConcernOptions::kInternalMajorityNoSnapshot,
WriteConcernOptions::SyncMode::UNSET,
@@ -75,14 +66,6 @@ const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
const ReadPreferenceSetting kPrimaryReadPreference{ReadPreference::PrimaryOnly};
-bool isRetryableError(ErrorCodes::Error code) {
- // TODO (SERVER-37880): Consider using RemoteCommandRetryScheduler.
- return std::find(RemoteCommandRetryScheduler::kAllRetriableErrors.begin(),
- RemoteCommandRetryScheduler::kAllRetriableErrors.end(),
- code) != RemoteCommandRetryScheduler::kAllRetriableErrors.end() ||
- code == ErrorCodes::NetworkInterfaceExceededTimeLimit;
-}
-
BSONArray buildParticipantListMatchesConditions(const std::vector<ShardId>& participantList) {
BSONArrayBuilder barr;
for (const auto& participant : participantList) {
@@ -120,10 +103,6 @@ bool shouldRetryPersistingCoordinatorState(const Status& responseStatus) {
} // namespace
-TransactionCoordinatorDriver::TransactionCoordinatorDriver(ServiceContext* serviceContext,
- txn::AsyncWorkScheduler& scheduler)
- : _serviceContext(serviceContext), _scheduler(scheduler) {}
-
namespace {
void persistParticipantListBlocking(OperationContext* opCtx,
const LogicalSessionId& lsid,
@@ -213,27 +192,53 @@ void persistParticipantListBlocking(OperationContext* opCtx,
}
} // namespace
-Future<void> TransactionCoordinatorDriver::persistParticipantList(
- const LogicalSessionId& lsid, TxnNumber txnNumber, std::vector<ShardId> participantList) {
- return txn::doWhile(_scheduler,
- boost::none /* no need for a backoff */,
- [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
- [this, lsid, txnNumber, participantList] {
- return _scheduler.scheduleWork(
- [lsid, txnNumber, participantList](OperationContext* opCtx) {
- persistParticipantListBlocking(
- opCtx, lsid, txnNumber, participantList);
- });
- });
+Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants) {
+ return txn::doWhile(
+ scheduler,
+ boost::none /* no need for a backoff */,
+ [](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
+ [&scheduler, lsid, txnNumber, participants] {
+ return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) {
+ persistParticipantListBlocking(opCtx, lsid, txnNumber, participants);
+ });
+ });
}
-Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
- const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
- PrepareTransaction prepareCmd;
- prepareCmd.setDbName("admin");
- auto prepareObj = prepareCmd.toBSON(
+void PrepareVoteConsensus::registerVote(const PrepareResponse& vote) {
+ if (vote.vote == PrepareVote::kCommit) {
+ ++_numCommitVotes;
+ _maxPrepareTimestamp = std::max(_maxPrepareTimestamp, *vote.prepareTimestamp);
+ } else if (vote.vote == PrepareVote::kAbort) {
+ ++_numAbortVotes;
+ } else {
+ ++_numNoVotes;
+ }
+}
+
+CoordinatorCommitDecision PrepareVoteConsensus::decision() const {
+ invariant(_numShards == _numCommitVotes + _numAbortVotes + _numNoVotes);
+
+ CoordinatorCommitDecision decision;
+ if (_numCommitVotes == _numShards) {
+ decision.setDecision(CommitDecision::kCommit);
+ decision.setCommitTimestamp(_maxPrepareTimestamp);
+ } else {
+ decision.setDecision(CommitDecision::kAbort);
+ }
+ return decision;
+}
+
+Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants) {
+ PrepareTransaction prepareTransaction;
+ prepareTransaction.setDbName(NamespaceString::kAdminDb);
+ auto prepareObj = prepareTransaction.toBSON(
BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
<< WriteConcernOptions::kWriteConcernField
<< WriteConcernOptions::InternalMajorityNoSnapshot));
@@ -242,10 +247,11 @@ Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
// Send prepare to all participants asynchronously and collect their future responses in a
// vector of responses.
- auto prepareScheduler = _scheduler.makeChildScheduler();
+ auto prepareScheduler = scheduler.makeChildScheduler();
- for (const auto& participant : participantShards) {
- responses.push_back(sendPrepareToShard(*prepareScheduler, participant, prepareObj));
+ for (const auto& participant : participants) {
+ responses.emplace_back(
+ sendPrepareToShard(service, *prepareScheduler, participant, prepareObj));
}
// Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp
@@ -254,37 +260,19 @@ Future<PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
return txn::collect(
std::move(responses),
// Initial value
- PrepareVoteConsensus{boost::none, boost::none},
+ PrepareVoteConsensus{int(participants.size())},
// Aggregates an incoming response (next) with the existing aggregate value (result)
[prepareScheduler = std::move(prepareScheduler)](PrepareVoteConsensus & result,
const PrepareResponse& next) {
- if (!next.vote) {
- LOG(3) << "Transaction coordinator did not receive a response from shard "
- << next.shardId;
- return txn::ShouldStopIteration::kNo;
- }
+ result.registerVote(next);
- switch (*next.vote) {
- case PrepareVote::kAbort:
- result.decision = CommitDecision::kAbort;
- result.maxPrepareTimestamp = boost::none;
- prepareScheduler->shutdown(
- {ErrorCodes::TransactionCoordinatorReachedAbortDecision,
- "Received at least one vote abort"});
- return txn::ShouldStopIteration::kYes;
- case PrepareVote::kCommit:
- result.decision = CommitDecision::kCommit;
- result.maxPrepareTimestamp = (result.maxPrepareTimestamp)
- ? std::max(result.maxPrepareTimestamp, next.prepareTimestamp)
- : next.prepareTimestamp;
- return txn::ShouldStopIteration::kNo;
- case PrepareVote::kCanceled:
- // PrepareVote is just an alias for CommitDecision, so we need to include this
- // branch as part of the switch statement, but CommitDecision::kCanceled will
- // never be a valid response to prepare so this path is unreachable.
- MONGO_UNREACHABLE;
+ if (next.vote == PrepareVote::kAbort) {
+ prepareScheduler->shutdown(
+ {ErrorCodes::TransactionCoordinatorReachedAbortDecision,
+ str::stream() << "Received abort vote from " << next.shardId});
}
- MONGO_UNREACHABLE;
+
+ return txn::ShouldStopIteration::kNo;
});
}
@@ -400,55 +388,59 @@ void persistDecisionBlocking(OperationContext* opCtx,
}
} // namespace
-Future<void> TransactionCoordinatorDriver::persistDecision(
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- std::vector<ShardId> participantList,
- const boost::optional<Timestamp>& commitTimestamp) {
+Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants,
+ const boost::optional<Timestamp>& commitTimestamp) {
return txn::doWhile(
- _scheduler,
+ scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
- [this, lsid, txnNumber, participantList, commitTimestamp] {
- return _scheduler.scheduleWork([lsid, txnNumber, participantList, commitTimestamp](
- OperationContext* opCtx) {
- persistDecisionBlocking(opCtx, lsid, txnNumber, participantList, commitTimestamp);
- });
+ [&scheduler, lsid, txnNumber, participants, commitTimestamp] {
+ return scheduler.scheduleWork(
+ [lsid, txnNumber, participants, commitTimestamp](OperationContext* opCtx) {
+ persistDecisionBlocking(opCtx, lsid, txnNumber, participants, commitTimestamp);
+ });
});
}
-Future<void> TransactionCoordinatorDriver::sendCommit(const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- Timestamp commitTimestamp) {
+Future<void> sendCommit(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants,
+ Timestamp commitTimestamp) {
CommitTransaction commitTransaction;
+ commitTransaction.setDbName(NamespaceString::kAdminDb);
commitTransaction.setCommitTimestamp(commitTimestamp);
- commitTransaction.setDbName("admin");
- BSONObj commitObj = commitTransaction.toBSON(
+ auto commitObj = commitTransaction.toBSON(
BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
<< WriteConcernOptions::kWriteConcernField
<< WriteConcernOptions::Majority));
std::vector<Future<void>> responses;
- for (const auto& participant : participantShards) {
- responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, commitObj));
+ for (const auto& participant : participants) {
+ responses.push_back(sendDecisionToShard(service, scheduler, participant, commitObj));
}
return txn::whenAll(responses);
}
-Future<void> TransactionCoordinatorDriver::sendAbort(const std::vector<ShardId>& participantShards,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
- BSONObj abortObj =
- BSON("abortTransaction" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << txnNumber
- << "autocommit"
- << false
- << WriteConcernOptions::kWriteConcernField
- << WriteConcernOptions::Majority);
+Future<void> sendAbort(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants) {
+ AbortTransaction abortTransaction;
+ abortTransaction.setDbName(NamespaceString::kAdminDb);
+ auto abortObj = abortTransaction.toBSON(
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
+ << WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::Majority));
std::vector<Future<void>> responses;
- for (const auto& participant : participantShards) {
- responses.push_back(sendDecisionToParticipantShard(_scheduler, participant, abortObj));
+ for (const auto& participant : participants) {
+ responses.push_back(sendDecisionToShard(service, scheduler, participant, abortObj));
}
return txn::whenAll(responses);
}
@@ -516,21 +508,21 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
}
} // namespace
-Future<void> TransactionCoordinatorDriver::deleteCoordinatorDoc(const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
- return txn::doWhile(_scheduler,
+Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
+ return txn::doWhile(scheduler,
boost::none /* no need for a backoff */,
[](const Status& s) { return shouldRetryPersistingCoordinatorState(s); },
- [this, lsid, txnNumber] {
- return _scheduler.scheduleWork(
+ [&scheduler, lsid, txnNumber] {
+ return scheduler.scheduleWork(
[lsid, txnNumber](OperationContext* opCtx) {
deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber);
});
});
}
-std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAllCoordinatorDocs(
- OperationContext* opCtx) {
+std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) {
std::vector<TransactionCoordinatorDocument> allCoordinatorDocs;
DBDirectClient client(opCtx);
@@ -548,15 +540,16 @@ std::vector<TransactionCoordinatorDocument> TransactionCoordinatorDriver::readAl
return allCoordinatorDocs;
}
-Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
- txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) {
- const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext));
+Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
+ const BSONObj& commandObj) {
+ const bool isLocalShard = (shardId == txn::getLocalShardId(service));
auto f = txn::doWhile(
scheduler,
kExponentialBackoff,
- [ shardId, isLocalShard, commandObj = commandObj.getOwned() ](
- StatusWith<PrepareResponse> swPrepareResponse) {
+ [](const StatusWith<PrepareResponse>& swPrepareResponse) {
// *Always* retry until hearing a conclusive response or being told to stop via a
// coordinator-specific code.
return !swPrepareResponse.isOK() &&
@@ -629,21 +622,23 @@ Future<PrepareResponse> TransactionCoordinatorDriver::sendPrepareToShard(
});
}
-Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
- txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) {
- const bool isLocalShard = (shardId == txn::getLocalShardId(_serviceContext));
+Future<void> sendDecisionToShard(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
+ const BSONObj& commandObj) {
+ const bool isLocalShard = (shardId == txn::getLocalShardId(service));
return txn::doWhile(
scheduler,
kExponentialBackoff,
- [ shardId, isLocalShard, commandObj = commandObj.getOwned() ](const Status& s) {
+ [](const Status& s) {
// *Always* retry until hearing a conclusive response or being told to stop via a
// coordinator-specific code.
return !s.isOK() && s != ErrorCodes::TransactionCoordinatorSteppingDown;
},
[&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] {
LOG(3) << "Coordinator going to send command " << commandObj << " to "
- << (isLocalShard ? " local " : "") << " shard " << shardId;
+ << (isLocalShard ? "local" : "") << " shard " << shardId;
return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj)
.then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) {
@@ -676,4 +671,5 @@ Future<void> TransactionCoordinatorDriver::sendDecisionToParticipantShard(
});
}
+} // namespace txn
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h
new file mode 100644
index 00000000000..583350a6d0e
--- /dev/null
+++ b/src/mongo/db/s/transaction_coordinator_util.h
@@ -0,0 +1,215 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/s/transaction_coordinator_document_gen.h"
+#include "mongo/db/s/transaction_coordinator_futures_util.h"
+
+namespace mongo {
+namespace txn {
+
+/**
+ * Upserts a document of the form:
+ *
+ * {
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * participants: ["shard0000", "shard0001"]
+ * }
+ *
+ * into config.transaction_coordinators and waits for the upsert to be majority-committed.
+ *
+ * Throws if the upsert fails or waiting for writeConcern fails.
+ *
+ * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means a
+ * document for the (lsid, txnNumber) exists with a different participant list.
+ */
+Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants);
+
+/**
+ * Sends prepare to all participants and returns a future that will be resolved when either:
+ * a) All participants have responded with a vote to commit, or
+ * b) At least one participant votes to abort.
+ */
+struct PrepareResponse;
+class PrepareVoteConsensus {
+public:
+ PrepareVoteConsensus(int numShards) : _numShards(numShards) {}
+
+ void registerVote(const PrepareResponse& vote);
+
+ /**
+ * May only be called when all of `numShards` have called `registerVote` above. Contains the
+ * commit decision of the vote, which would only be kCommit if all shards have responded with a
+ * 'commit'.
+ */
+ CoordinatorCommitDecision decision() const;
+
+private:
+ int _numShards;
+
+ int _numCommitVotes{0};
+ int _numAbortVotes{0};
+ int _numNoVotes{0};
+
+ Timestamp _maxPrepareTimestamp;
+};
+Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants);
+
+/**
+ * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators
+ * for
+ *
+ * (lsid, txnNumber) to be:
+ *
+ * {
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * participants: ["shard0000", "shard0001"]
+ * decision: "abort"
+ * }
+ *
+ * else updates the document to be:
+ *
+ * {
+ * _id: {lsid: <lsid>, txnNumber: <txnNumber>}
+ * participants: ["shard0000", "shard0001"]
+ * decision: "commit"
+ * commitTimestamp: Timestamp(xxxxxxxx, x),
+ * }
+ *
+ * and waits for the update to be majority-committed.
+ *
+ * Throws if the update fails or waiting for writeConcern fails.
+ *
+ * If the update succeeds but did not update any document, throws an anonymous error, because it
+ * means either no document for (lsid, txnNumber) exists, or a document exists but has a different
+ * participant list, different decision, or different commit Timestamp.
+ */
+Future<void> persistDecision(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants,
+ const boost::optional<Timestamp>& commitTimestamp);
+
+/**
+ * Sends commit to all shards and returns a future that will be resolved when all participants have
+ * responded with success.
+ */
+Future<void> sendCommit(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants,
+ Timestamp commitTimestamp);
+
+/**
+ * Sends abort to all shards and returns a future that will be resolved when all participants have
+ * responded with success.
+ */
+Future<void> sendAbort(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const txn::ParticipantsList& participants);
+
+/**
+ * Deletes the document in config.transaction_coordinators for (lsid, txnNumber).
+ *
+ * Does *not* wait for the delete to be majority-committed.
+ *
+ * Throws if the update fails.
+ *
+ * If the delete succeeds but did not delete any document, throws an anonymous error, because it
+ * means either no document for (lsid, txnNumber) exists, or a document exists but without a
+ * decision.
+ */
+Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber);
+
+/**
+ * Reads and returns all documents in config.transaction_coordinators.
+ */
+std::vector<txn::TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx);
+
+//
+// These methods are used internally and are exposed for unit-testing purposes only
+//
+
+/**
+ * Sends prepare to the given shard and returns a future, which will be set with the vote.
+ *
+ * This method will retry until it receives a non-retryable response from the remote node or until
+ * the scheduler under which it is running is shut down. Because of this it can return only the
+ * following error code(s):
+ * - TransactionCoordinatorSteppingDown
+ * - ShardNotFound
+ */
+struct PrepareResponse {
+ // Shard id from which the response was received
+ ShardId shardId;
+
+ // If set to none, this means the shard did not produce a vote
+ boost::optional<txn::PrepareVote> vote;
+
+ // Will only be set if the vote was kCommit
+ boost::optional<Timestamp> prepareTimestamp;
+};
+Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
+ const BSONObj& prepareCommandObj);
+
+/**
+ * Sends a command corresponding to a commit decision (i.e. commitTransaction or*
+ * abortTransaction) to the given shard and returns a future, which will be set with the result.
+ *
+ * Used for sendCommit and sendAbort.
+ *
+ * This method will retry until it receives a response from the remote node which can be
+ * interpreted as vote abort (e.g. NoSuchTransaction), or until the scheduler under which it is
+ * running is shut down. Because of this it can return only the following error code(s):
+ * - TransactionCoordinatorSteppingDown
+ */
+Future<void> sendDecisionToShard(ServiceContext* service,
+ txn::AsyncWorkScheduler& scheduler,
+ const ShardId& shardId,
+ const BSONObj& commandObj);
+
+} // namespace txn
+} // namespace mongo