diff options
Diffstat (limited to 'src/mongo/db/transaction_coordinator.cpp')
-rw-r--r-- | src/mongo/db/transaction_coordinator.cpp | 91 |
1 files changed, 62 insertions, 29 deletions
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index 459a8cc7497..5735554ebbc 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -34,11 +34,39 @@ #include "mongo/db/transaction_coordinator.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/transaction_coordinator_util.h" #include "mongo/util/log.h" namespace mongo { +namespace { + +txn::CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus( + const txn::PrepareVoteConsensus& result, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber) { + invariant(result.decision); + txn::CoordinatorCommitDecision decision{result.decision.get(), boost::none}; + + if (result.decision == TransactionCoordinator::CommitDecision::kCommit) { + invariant(result.maxPrepareTimestamp); + + decision.commitTimestamp = Timestamp(result.maxPrepareTimestamp->getSecs(), + result.maxPrepareTimestamp->getInc() + 1); + + LOG(3) << "Advancing cluster time to commit Timestamp " << decision.commitTimestamp.get() + << " of transaction " << txnNumber << " on session " << lsid.toBSON(); + + uassertStatusOK(LogicalClock::get(getGlobalServiceContext()) + ->advanceClusterTime(LogicalTime(result.maxPrepareTimestamp.get()))); + } + + return decision; +} + +} // namespace + TransactionCoordinator::~TransactionCoordinator() { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_state == TransactionCoordinator::CoordinatorState::kDone); @@ -66,7 +94,12 @@ SharedSemiFuture<TransactionCoordinator::CommitDecision> TransactionCoordinator: } auto coordinator = shared_from_this(); - txn::async(_callbackPool, []() { txn::persistParticipantList(); }) + txn::async(_callbackPool, + [coordinator, participantShards] { + auto opCtx = Client::getCurrent()->makeOperationContext(); + txn::persistParticipantList( + opCtx.get(), coordinator->_lsid, coordinator->_txnNumber, participantShards); + }) .then([coordinator, participantShards]() { return txn::sendPrepare(coordinator, coordinator->_networkExecutor, @@ -75,22 +108,38 @@ SharedSemiFuture<TransactionCoordinator::CommitDecision> TransactionCoordinator: coordinator->_lsid, coordinator->_txnNumber); }) - .then([coordinator](txn::PrepareVoteConsensus response) { - return coordinator->_persistDecision(response); + .then([coordinator, participantShards](txn::PrepareVoteConsensus result) { + invariant(coordinator->_state == CoordinatorState::kPreparing); + return txn::async(coordinator->_callbackPool, [coordinator, result, participantShards] { + auto opCtx = Client::getCurrent()->makeOperationContext(); + const auto decision = makeDecisionFromPrepareVoteConsensus( + result, coordinator->_lsid, coordinator->_txnNumber); + txn::persistDecision(opCtx.get(), + coordinator->_lsid, + coordinator->_txnNumber, + participantShards, + decision.commitTimestamp); + return decision; + }); }) .then([coordinator, participantShards](txn::CoordinatorCommitDecision decision) { - // Send the decision and then propagate it down the continuation chain. return coordinator->_sendDecisionToParticipants(participantShards, decision) .then([decision] { return decision.decision; }); }) .then([coordinator](CommitDecision finalDecision) { stdx::unique_lock<stdx::mutex> lk(coordinator->_mutex); - LOG(3) << "Two-phase commit completed successfully with decision " << finalDecision - << " for session " << coordinator->_lsid.toBSON() << ", transaction number " - << coordinator->_txnNumber; + LOG(3) << "Finished coordinating transaction " << coordinator->_txnNumber + << " on session " << coordinator->_lsid.toBSON() << " with decision " + << finalDecision; coordinator->_transitionToDone(std::move(lk)); }) - .then([coordinator] { return coordinator->_deleteDecision(); }) + .then([coordinator] { + return txn::async(coordinator->_callbackPool, [coordinator] { + auto opCtx = Client::getCurrent()->makeOperationContext(); + return txn::deleteCoordinatorDoc( + opCtx.get(), coordinator->_lsid, coordinator->_txnNumber); + }); + }) .onError([coordinator](Status s) { stdx::unique_lock<stdx::mutex> lk(coordinator->_mutex); LOG(3) << "Two-phase commit failed with error in state " << coordinator->_state @@ -128,33 +177,23 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() { } } -Future<txn::CoordinatorCommitDecision> TransactionCoordinator::_persistDecision( - const txn::PrepareVoteConsensus& prepareResponse) { - invariant(_state == CoordinatorState::kPreparing); - // TODO (SERVER-36853): Implement persistence of decision. - // TODO (SERVER-36853): Handle errors appropriately. - return txn::async(_callbackPool, - [prepareResponse]() { return persistDecision(prepareResponse); }); -} - Future<void> TransactionCoordinator::_sendDecisionToParticipants( - const std::vector<ShardId>& participantShards, - txn::CoordinatorCommitDecision coordinatorDecision) { + const std::vector<ShardId>& participantShards, txn::CoordinatorCommitDecision decision) { invariant(_state == CoordinatorState::kPreparing); - _finalDecisionPromise.emplaceValue(coordinatorDecision.decision); + _finalDecisionPromise.emplaceValue(decision.decision); // Send the decision to all participants. - switch (coordinatorDecision.decision) { + switch (decision.decision) { case CommitDecision::kCommit: _state = CoordinatorState::kCommitting; - invariant(coordinatorDecision.commitTimestamp); + invariant(decision.commitTimestamp); return txn::sendCommit(_networkExecutor, _callbackPool, participantShards, _lsid, _txnNumber, - coordinatorDecision.commitTimestamp.get()); + decision.commitTimestamp.get()); case CommitDecision::kAbort: _state = CoordinatorState::kAborting; return txn::sendAbort( @@ -163,12 +202,6 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants( MONGO_UNREACHABLE; }; -Future<void> TransactionCoordinator::_deleteDecision() { - invariant(_state == CoordinatorState::kDone); - // TODO (SERVER-36853): Implement deletion of decision. - return Future<void>::makeReady(); -} - void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept { _state = CoordinatorState::kDone; |