summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_coordinator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_coordinator.cpp')
-rw-r--r--src/mongo/db/transaction_coordinator.cpp91
1 files changed, 62 insertions, 29 deletions
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 459a8cc7497..5735554ebbc 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -34,11 +34,39 @@
#include "mongo/db/transaction_coordinator.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/transaction_coordinator_util.h"
#include "mongo/util/log.h"
namespace mongo {
+namespace {
+
+txn::CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus(
+ const txn::PrepareVoteConsensus& result,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber) {
+ invariant(result.decision);
+ txn::CoordinatorCommitDecision decision{result.decision.get(), boost::none};
+
+ if (result.decision == TransactionCoordinator::CommitDecision::kCommit) {
+ invariant(result.maxPrepareTimestamp);
+
+ decision.commitTimestamp = Timestamp(result.maxPrepareTimestamp->getSecs(),
+ result.maxPrepareTimestamp->getInc() + 1);
+
+ LOG(3) << "Advancing cluster time to commit Timestamp " << decision.commitTimestamp.get()
+ << " of transaction " << txnNumber << " on session " << lsid.toBSON();
+
+ uassertStatusOK(LogicalClock::get(getGlobalServiceContext())
+ ->advanceClusterTime(LogicalTime(result.maxPrepareTimestamp.get())));
+ }
+
+ return decision;
+}
+
+} // namespace
+
TransactionCoordinator::~TransactionCoordinator() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == TransactionCoordinator::CoordinatorState::kDone);
@@ -66,7 +94,12 @@ SharedSemiFuture<TransactionCoordinator::CommitDecision> TransactionCoordinator:
}
auto coordinator = shared_from_this();
- txn::async(_callbackPool, []() { txn::persistParticipantList(); })
+ txn::async(_callbackPool,
+ [coordinator, participantShards] {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ txn::persistParticipantList(
+ opCtx.get(), coordinator->_lsid, coordinator->_txnNumber, participantShards);
+ })
.then([coordinator, participantShards]() {
return txn::sendPrepare(coordinator,
coordinator->_networkExecutor,
@@ -75,22 +108,38 @@ SharedSemiFuture<TransactionCoordinator::CommitDecision> TransactionCoordinator:
coordinator->_lsid,
coordinator->_txnNumber);
})
- .then([coordinator](txn::PrepareVoteConsensus response) {
- return coordinator->_persistDecision(response);
+ .then([coordinator, participantShards](txn::PrepareVoteConsensus result) {
+ invariant(coordinator->_state == CoordinatorState::kPreparing);
+ return txn::async(coordinator->_callbackPool, [coordinator, result, participantShards] {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ const auto decision = makeDecisionFromPrepareVoteConsensus(
+ result, coordinator->_lsid, coordinator->_txnNumber);
+ txn::persistDecision(opCtx.get(),
+ coordinator->_lsid,
+ coordinator->_txnNumber,
+ participantShards,
+ decision.commitTimestamp);
+ return decision;
+ });
})
.then([coordinator, participantShards](txn::CoordinatorCommitDecision decision) {
- // Send the decision and then propagate it down the continuation chain.
return coordinator->_sendDecisionToParticipants(participantShards, decision)
.then([decision] { return decision.decision; });
})
.then([coordinator](CommitDecision finalDecision) {
stdx::unique_lock<stdx::mutex> lk(coordinator->_mutex);
- LOG(3) << "Two-phase commit completed successfully with decision " << finalDecision
- << " for session " << coordinator->_lsid.toBSON() << ", transaction number "
- << coordinator->_txnNumber;
+ LOG(3) << "Finished coordinating transaction " << coordinator->_txnNumber
+ << " on session " << coordinator->_lsid.toBSON() << " with decision "
+ << finalDecision;
coordinator->_transitionToDone(std::move(lk));
})
- .then([coordinator] { return coordinator->_deleteDecision(); })
+ .then([coordinator] {
+ return txn::async(coordinator->_callbackPool, [coordinator] {
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ return txn::deleteCoordinatorDoc(
+ opCtx.get(), coordinator->_lsid, coordinator->_txnNumber);
+ });
+ })
.onError([coordinator](Status s) {
stdx::unique_lock<stdx::mutex> lk(coordinator->_mutex);
LOG(3) << "Two-phase commit failed with error in state " << coordinator->_state
@@ -128,33 +177,23 @@ void TransactionCoordinator::cancelIfCommitNotYetStarted() {
}
}
-Future<txn::CoordinatorCommitDecision> TransactionCoordinator::_persistDecision(
- const txn::PrepareVoteConsensus& prepareResponse) {
- invariant(_state == CoordinatorState::kPreparing);
- // TODO (SERVER-36853): Implement persistence of decision.
- // TODO (SERVER-36853): Handle errors appropriately.
- return txn::async(_callbackPool,
- [prepareResponse]() { return persistDecision(prepareResponse); });
-}
-
Future<void> TransactionCoordinator::_sendDecisionToParticipants(
- const std::vector<ShardId>& participantShards,
- txn::CoordinatorCommitDecision coordinatorDecision) {
+ const std::vector<ShardId>& participantShards, txn::CoordinatorCommitDecision decision) {
invariant(_state == CoordinatorState::kPreparing);
- _finalDecisionPromise.emplaceValue(coordinatorDecision.decision);
+ _finalDecisionPromise.emplaceValue(decision.decision);
// Send the decision to all participants.
- switch (coordinatorDecision.decision) {
+ switch (decision.decision) {
case CommitDecision::kCommit:
_state = CoordinatorState::kCommitting;
- invariant(coordinatorDecision.commitTimestamp);
+ invariant(decision.commitTimestamp);
return txn::sendCommit(_networkExecutor,
_callbackPool,
participantShards,
_lsid,
_txnNumber,
- coordinatorDecision.commitTimestamp.get());
+ decision.commitTimestamp.get());
case CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
return txn::sendAbort(
@@ -163,12 +202,6 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
MONGO_UNREACHABLE;
};
-Future<void> TransactionCoordinator::_deleteDecision() {
- invariant(_state == CoordinatorState::kDone);
- // TODO (SERVER-36853): Implement deletion of decision.
- return Future<void>::makeReady();
-}
-
void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept {
_state = CoordinatorState::kDone;