diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-12-27 13:53:03 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-03 17:02:50 -0500 |
commit | 310b84d607506406c78925693b4bf71dd95e35e5 (patch) | |
tree | 749a9fad2d4e4b89e4d210d92d0c9d29792eb4df /src/mongo/db/s/txn_two_phase_commit_cmds.cpp | |
parent | f27c375287f7c69067d5ba437533622dbecf41c9 (diff) | |
download | mongo-310b84d607506406c78925693b4bf71dd95e35e5.tar.gz |
SERVER-38522 Make persistParticipantList/persistDecision/deleteCoordinatorDoc asynchronous
Also splits TransactionCoordinator util into driver and futures components
Diffstat (limited to 'src/mongo/db/s/txn_two_phase_commit_cmds.cpp')
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index c62717e5e34..2d028835dfc 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -43,7 +43,6 @@ #include "mongo/db/transaction_participant.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/grid.h" #include "mongo/util/log.h" namespace mongo { @@ -84,13 +83,12 @@ public: serverGlobalParams.enableMajorityReadConcern); // We do not allow preparing a transaction if the replica set has any arbiters. - auto replCoord = - repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); uassert(50995, "'prepareTransaction' is not supported for replica sets with arbiters", !replCoord->setContainsArbiter()); - auto txnParticipant = TransactionParticipant::get(opCtx); + const auto txnParticipant = TransactionParticipant::get(opCtx); uassert(ErrorCodes::CommandFailed, "prepareTransaction must be run within a transaction", txnParticipant); @@ -112,8 +110,9 @@ public: if (txnParticipant->transactionIsPrepared()) { auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); auto prepareOpTime = txnParticipant->getPrepareOpTime(); + // Set the client optime to be prepareOpTime if it's not already later than - // prepareOpTime. his ensures that we wait for writeConcern and that prepareOpTime + // prepareOpTime. This ensures that we wait for writeConcern and that prepareOpTime // will be committed. if (prepareOpTime > replClient.getLastOp()) { replClient.setLastOp(prepareOpTime); @@ -155,7 +154,7 @@ public: void doCheckAuthorization(OperationContext* opCtx) const override {} }; - virtual bool adminOnly() const { + bool adminOnly() const override { return true; } @@ -167,11 +166,13 @@ public: AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kNever; } + } prepareTransactionCmd; class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> { public: using Request = CoordinateCommitTransaction; + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -189,8 +190,9 @@ public: ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42)); const auto& cmd = request(); + const auto tcs = TransactionCoordinatorService::get(opCtx); - boost::optional<Future<TransactionCoordinator::CommitDecision>> commitDecisionFuture; + boost::optional<Future<txn::CommitDecision>> commitDecisionFuture; if (!cmd.getParticipants().empty()) { // Convert the participant list array into a set, and assert that all participants @@ -201,7 +203,7 @@ public: StringBuilder ss; ss << "["; for (const auto& participant : cmd.getParticipants()) { - const auto shardId = participant.getShardId(); + const auto& shardId = participant.getShardId(); uassert(ErrorCodes::InvalidOptions, str::stream() << "participant list contained duplicate shardId " << shardId, @@ -216,19 +218,16 @@ public: << ss.str() << " for transaction " << opCtx->getTxnNumber() << " on session " << opCtx->getLogicalSessionId()->toBSON(); - commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->coordinateCommit( - opCtx, - opCtx->getLogicalSessionId().get(), - opCtx->getTxnNumber().get(), - participantList); + commitDecisionFuture = tcs->coordinateCommit( + opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), participantList); } else { LOG(3) << "Coordinator shard received request to recover commit decision for " "transaction " << opCtx->getTxnNumber() << " on session " << opCtx->getLogicalSessionId()->toBSON(); - commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->recoverCommit( - opCtx, opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get()); + commitDecisionFuture = tcs->recoverCommit( + opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber()); } // Since the coordinator *will* have written the decision from another OperationContext, @@ -240,9 +239,9 @@ public: // The commit coordination is still ongoing. Block waiting for the decision. auto commitDecision = commitDecisionFuture->get(opCtx); switch (commitDecision) { - case TransactionCoordinator::CommitDecision::kAbort: + case txn::CommitDecision::kAbort: uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted"); - case TransactionCoordinator::CommitDecision::kCommit: + case txn::CommitDecision::kCommit: return; } } @@ -255,6 +254,7 @@ public: "participant for transaction " << opCtx->getTxnNumber() << " on session " << opCtx->getLogicalSessionId()->toBSON(); + recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx); } @@ -281,6 +281,7 @@ public: AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kNever; } + } coordinateCommitTransactionCmd; } // namespace |