summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-12-27 13:53:03 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-03 17:02:50 -0500
commit310b84d607506406c78925693b4bf71dd95e35e5 (patch)
tree749a9fad2d4e4b89e4d210d92d0c9d29792eb4df /src/mongo/db/s/txn_two_phase_commit_cmds.cpp
parentf27c375287f7c69067d5ba437533622dbecf41c9 (diff)
downloadmongo-310b84d607506406c78925693b4bf71dd95e35e5.tar.gz
SERVER-38522 Make persistParticipantList/persistDecision/deleteCoordinatorDoc asynchronous
Also splits TransactionCoordinator util into driver and futures components
Diffstat (limited to 'src/mongo/db/s/txn_two_phase_commit_cmds.cpp')
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp35
1 files changed, 18 insertions, 17 deletions
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index c62717e5e34..2d028835dfc 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/transaction_participant.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/grid.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -84,13 +83,12 @@ public:
serverGlobalParams.enableMajorityReadConcern);
// We do not allow preparing a transaction if the replica set has any arbiters.
- auto replCoord =
- repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
uassert(50995,
"'prepareTransaction' is not supported for replica sets with arbiters",
!replCoord->setContainsArbiter());
- auto txnParticipant = TransactionParticipant::get(opCtx);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
uassert(ErrorCodes::CommandFailed,
"prepareTransaction must be run within a transaction",
txnParticipant);
@@ -112,8 +110,9 @@ public:
if (txnParticipant->transactionIsPrepared()) {
auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
auto prepareOpTime = txnParticipant->getPrepareOpTime();
+
// Set the client optime to be prepareOpTime if it's not already later than
- // prepareOpTime. his ensures that we wait for writeConcern and that prepareOpTime
+ // prepareOpTime. This ensures that we wait for writeConcern and that prepareOpTime
// will be committed.
if (prepareOpTime > replClient.getLastOp()) {
replClient.setLastOp(prepareOpTime);
@@ -155,7 +154,7 @@ public:
void doCheckAuthorization(OperationContext* opCtx) const override {}
};
- virtual bool adminOnly() const {
+ bool adminOnly() const override {
return true;
}
@@ -167,11 +166,13 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
+
} prepareTransactionCmd;
class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> {
public:
using Request = CoordinateCommitTransaction;
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@@ -189,8 +190,9 @@ public:
ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42));
const auto& cmd = request();
+ const auto tcs = TransactionCoordinatorService::get(opCtx);
- boost::optional<Future<TransactionCoordinator::CommitDecision>> commitDecisionFuture;
+ boost::optional<Future<txn::CommitDecision>> commitDecisionFuture;
if (!cmd.getParticipants().empty()) {
// Convert the participant list array into a set, and assert that all participants
@@ -201,7 +203,7 @@ public:
StringBuilder ss;
ss << "[";
for (const auto& participant : cmd.getParticipants()) {
- const auto shardId = participant.getShardId();
+ const auto& shardId = participant.getShardId();
uassert(ErrorCodes::InvalidOptions,
str::stream() << "participant list contained duplicate shardId "
<< shardId,
@@ -216,19 +218,16 @@ public:
<< ss.str() << " for transaction " << opCtx->getTxnNumber() << " on session "
<< opCtx->getLogicalSessionId()->toBSON();
- commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->coordinateCommit(
- opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- participantList);
+ commitDecisionFuture = tcs->coordinateCommit(
+ opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), participantList);
} else {
LOG(3) << "Coordinator shard received request to recover commit decision for "
"transaction "
<< opCtx->getTxnNumber() << " on session "
<< opCtx->getLogicalSessionId()->toBSON();
- commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->recoverCommit(
- opCtx, opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get());
+ commitDecisionFuture = tcs->recoverCommit(
+ opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber());
}
// Since the coordinator *will* have written the decision from another OperationContext,
@@ -240,9 +239,9 @@ public:
// The commit coordination is still ongoing. Block waiting for the decision.
auto commitDecision = commitDecisionFuture->get(opCtx);
switch (commitDecision) {
- case TransactionCoordinator::CommitDecision::kAbort:
+ case txn::CommitDecision::kAbort:
uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted");
- case TransactionCoordinator::CommitDecision::kCommit:
+ case txn::CommitDecision::kCommit:
return;
}
}
@@ -255,6 +254,7 @@ public:
"participant for transaction "
<< opCtx->getTxnNumber() << " on session "
<< opCtx->getLogicalSessionId()->toBSON();
+
recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx);
}
@@ -281,6 +281,7 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kNever;
}
+
} coordinateCommitTransactionCmd;
} // namespace