diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-03-27 18:22:26 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-04-01 07:26:26 -0400 |
commit | 907e4760487e081db031232d1f8326c0f8bdef68 (patch) | |
tree | b27bdde6a3294072b5ff15ce07081e51696b2909 /src/mongo/db/s/transaction_coordinator.cpp | |
parent | 9dd07662bbe80415d90131fca2be6312166d6f39 (diff) | |
download | mongo-907e4760487e081db031232d1f8326c0f8bdef68.tar.gz |
SERVER-40297 Make all TransactionCoordinatorDriver methods free functions
The TransactionCoordinatorDriver is not really a "driver", but just a
set of functions to perform asynchronous work. There isn't any state to
keep, so there is no need for them to be in a class.
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 48 |
1 files changed, 24 insertions, 24 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 7689ddb2108..65b1231fe66 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -34,8 +34,6 @@ #include "mongo/db/s/transaction_coordinator.h" #include "mongo/db/logical_clock.h" -#include "mongo/db/s/transaction_coordinator_document_gen.h" -#include "mongo/db/s/transaction_coordinator_futures_util.h" #include "mongo/util/log.h" namespace mongo { @@ -43,28 +41,24 @@ namespace { using CommitDecision = txn::CommitDecision; using CoordinatorCommitDecision = txn::CoordinatorCommitDecision; +using PrepareVoteConsensus = txn::PrepareVoteConsensus; using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument; -using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus; - CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(ServiceContext* service, const PrepareVoteConsensus& result, const LogicalSessionId& lsid, TxnNumber txnNumber) { - invariant(result.decision); - CoordinatorCommitDecision decision(*result.decision); - - if (result.decision == CommitDecision::kCommit) { - invariant(result.maxPrepareTimestamp); - - decision.setCommitTimestamp(Timestamp(result.maxPrepareTimestamp->getSecs(), - result.maxPrepareTimestamp->getInc() + 1)); + auto decision = result.decision(); + if (decision.getDecision() == CommitDecision::kCommit) { LOG(3) << "Advancing cluster time to the commit timestamp " << *decision.getCommitTimestamp() << " for " << lsid.getId() << ':' << txnNumber; uassertStatusOK(LogicalClock::get(service)->advanceClusterTime( - LogicalTime(*result.maxPrepareTimestamp))); + LogicalTime(*decision.getCommitTimestamp()))); + + decision.setCommitTimestamp(Timestamp(decision.getCommitTimestamp()->getSecs(), + decision.getCommitTimestamp()->getInc() + 1)); } return decision; @@ -80,8 +74,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, : _serviceContext(serviceContext), _lsid(lsid), _txnNumber(txnNumber), - _scheduler(std::move(scheduler)), - _driver(serviceContext, *_scheduler) { + _scheduler(std::move(scheduler)) { if (coordinateCommitDeadline) { _deadlineScheduler = _scheduler->makeChildScheduler(); _deadlineScheduler @@ -119,7 +112,7 @@ void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) { _cancelTimeoutWaitForCommitTask(); - _driver.persistParticipantList(_lsid, _txnNumber, participantShards) + txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, participantShards) .then([this, participantShards]() { return _runPhaseOne(participantShards); }) .then([this, participantShards](CoordinatorCommitDecision decision) { return _runPhaseTwo(participantShards, decision); @@ -194,16 +187,18 @@ void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() { Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne( const std::vector<ShardId>& participantShards) { - return _driver.sendPrepare(participantShards, _lsid, _txnNumber) + return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, participantShards) .then([this, participantShards](PrepareVoteConsensus result) { invariant(_state == CoordinatorState::kPreparing); auto decision = makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber); - return _driver - .persistDecision( - _lsid, _txnNumber, participantShards, decision.getCommitTimestamp()) + return txn::persistDecision(*_scheduler, + _lsid, + _txnNumber, + participantShards, + decision.getCommitTimestamp()) .then([decision] { return decision; }); }); } @@ -215,7 +210,7 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa if (MONGO_FAIL_POINT(doNotForgetCoordinator)) return Future<void>::makeReady(); - return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); }) .then([this] { LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber; @@ -233,11 +228,16 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants( switch (decision.getDecision()) { case CommitDecision::kCommit: _state = CoordinatorState::kCommitting; - return _driver.sendCommit( - participantShards, _lsid, _txnNumber, *decision.getCommitTimestamp()); + return txn::sendCommit(_serviceContext, + *_scheduler, + _lsid, + _txnNumber, + participantShards, + *decision.getCommitTimestamp()); case CommitDecision::kAbort: _state = CoordinatorState::kAborting; - return _driver.sendAbort(participantShards, _lsid, _txnNumber); + return txn::sendAbort( + _serviceContext, *_scheduler, _lsid, _txnNumber, participantShards); case CommitDecision::kCanceled: MONGO_UNREACHABLE; }; |