summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-03-27 18:22:26 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-04-01 07:26:26 -0400
commit907e4760487e081db031232d1f8326c0f8bdef68 (patch)
treeb27bdde6a3294072b5ff15ce07081e51696b2909 /src/mongo/db/s/transaction_coordinator.cpp
parent9dd07662bbe80415d90131fca2be6312166d6f39 (diff)
downloadmongo-907e4760487e081db031232d1f8326c0f8bdef68.tar.gz
SERVER-40297 Make all TransactionCoordinatorDriver methods free functions
The TransactionCoordinatorDriver is not really a "driver", but just a set of functions to perform asynchronous work. There isn't any state to keep, so there is no need for them to be in a class.
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp48
1 files changed, 24 insertions, 24 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 7689ddb2108..65b1231fe66 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -34,8 +34,6 @@
#include "mongo/db/s/transaction_coordinator.h"
#include "mongo/db/logical_clock.h"
-#include "mongo/db/s/transaction_coordinator_document_gen.h"
-#include "mongo/db/s/transaction_coordinator_futures_util.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -43,28 +41,24 @@ namespace {
using CommitDecision = txn::CommitDecision;
using CoordinatorCommitDecision = txn::CoordinatorCommitDecision;
+using PrepareVoteConsensus = txn::PrepareVoteConsensus;
using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
-using PrepareVoteConsensus = TransactionCoordinatorDriver::PrepareVoteConsensus;
-
CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(ServiceContext* service,
const PrepareVoteConsensus& result,
const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- invariant(result.decision);
- CoordinatorCommitDecision decision(*result.decision);
-
- if (result.decision == CommitDecision::kCommit) {
- invariant(result.maxPrepareTimestamp);
-
- decision.setCommitTimestamp(Timestamp(result.maxPrepareTimestamp->getSecs(),
- result.maxPrepareTimestamp->getInc() + 1));
+ auto decision = result.decision();
+ if (decision.getDecision() == CommitDecision::kCommit) {
LOG(3) << "Advancing cluster time to the commit timestamp "
<< *decision.getCommitTimestamp() << " for " << lsid.getId() << ':' << txnNumber;
uassertStatusOK(LogicalClock::get(service)->advanceClusterTime(
- LogicalTime(*result.maxPrepareTimestamp)));
+ LogicalTime(*decision.getCommitTimestamp())));
+
+ decision.setCommitTimestamp(Timestamp(decision.getCommitTimestamp()->getSecs(),
+ decision.getCommitTimestamp()->getInc() + 1));
}
return decision;
@@ -80,8 +74,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
: _serviceContext(serviceContext),
_lsid(lsid),
_txnNumber(txnNumber),
- _scheduler(std::move(scheduler)),
- _driver(serviceContext, *_scheduler) {
+ _scheduler(std::move(scheduler)) {
if (coordinateCommitDeadline) {
_deadlineScheduler = _scheduler->makeChildScheduler();
_deadlineScheduler
@@ -119,7 +112,7 @@ void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) {
_cancelTimeoutWaitForCommitTask();
- _driver.persistParticipantList(_lsid, _txnNumber, participantShards)
+ txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, participantShards)
.then([this, participantShards]() { return _runPhaseOne(participantShards); })
.then([this, participantShards](CoordinatorCommitDecision decision) {
return _runPhaseTwo(participantShards, decision);
@@ -194,16 +187,18 @@ void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() {
Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
const std::vector<ShardId>& participantShards) {
- return _driver.sendPrepare(participantShards, _lsid, _txnNumber)
+ return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, participantShards)
.then([this, participantShards](PrepareVoteConsensus result) {
invariant(_state == CoordinatorState::kPreparing);
auto decision =
makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber);
- return _driver
- .persistDecision(
- _lsid, _txnNumber, participantShards, decision.getCommitTimestamp())
+ return txn::persistDecision(*_scheduler,
+ _lsid,
+ _txnNumber,
+ participantShards,
+ decision.getCommitTimestamp())
.then([decision] { return decision; });
});
}
@@ -215,7 +210,7 @@ Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& pa
if (MONGO_FAIL_POINT(doNotForgetCoordinator))
return Future<void>::makeReady();
- return _driver.deleteCoordinatorDoc(_lsid, _txnNumber);
+ return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber);
})
.then([this] {
LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber;
@@ -233,11 +228,16 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
switch (decision.getDecision()) {
case CommitDecision::kCommit:
_state = CoordinatorState::kCommitting;
- return _driver.sendCommit(
- participantShards, _lsid, _txnNumber, *decision.getCommitTimestamp());
+ return txn::sendCommit(_serviceContext,
+ *_scheduler,
+ _lsid,
+ _txnNumber,
+ participantShards,
+ *decision.getCommitTimestamp());
case CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
- return _driver.sendAbort(participantShards, _lsid, _txnNumber);
+ return txn::sendAbort(
+ _serviceContext, *_scheduler, _lsid, _txnNumber, participantShards);
case CommitDecision::kCanceled:
MONGO_UNREACHABLE;
};