summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-04-01 17:22:06 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-04-03 16:48:29 -0400
commit7575dac4abe7d5c6164b54487a81771cd3aad644 (patch)
treee74ba01038c58c5c94e6b686e482e9a4fdc88463 /src/mongo
parentc4d16e522c18e96114fd3e9b1b71ba835d11c8d7 (diff)
downloadmongo-7575dac4abe7d5c6164b54487a81771cd3aad644.tar.gz
SERVER-40297 Make the TransactionCoordinator entirely futures-based
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp398
-rw-r--r--src/mongo/db/s/transaction_coordinator.h104
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.cpp2
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog_test.cpp4
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.cpp13
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util.h5
-rw-r--r--src/mongo/db/s/transaction_coordinator_futures_util_test.cpp21
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp14
-rw-r--r--src/mongo/db/s/transaction_coordinator_service_test.cpp9
-rw-r--r--src/mongo/db/s/transaction_coordinator_structures.cpp2
-rw-r--r--src/mongo/db/s/transaction_coordinator_structures.h1
-rw-r--r--src/mongo/db/s/transaction_coordinator_test.cpp23
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.h11
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp108
14 files changed, 333 insertions, 382 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 98df3191e08..01c96be89f2 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -50,247 +50,251 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
- boost::optional<Date_t> coordinateCommitDeadline)
+ Date_t deadline)
: _serviceContext(serviceContext),
_lsid(lsid),
_txnNumber(txnNumber),
_scheduler(std::move(scheduler)) {
- if (coordinateCommitDeadline) {
- _deadlineScheduler = _scheduler->makeChildScheduler();
- _deadlineScheduler
- ->scheduleWorkAt(*coordinateCommitDeadline, [](OperationContext* opCtx) {})
- .getAsync([this](const Status& s) {
- if (s == ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled)
- return;
- cancelIfCommitNotYetStarted();
+
+ auto kickOffCommitPF = makePromiseFuture<void>();
+ _kickOffCommitPromise = std::move(kickOffCommitPF.promise);
+
+ // Task, which will fire when the transaction's total deadline has been reached. If the 2PC
+ // sequence has not yet started, it will be abandoned altogether.
+ auto deadlineFuture =
+ _scheduler
+ ->scheduleWorkAt(deadline, [this](OperationContext*) { cancelIfCommitNotYetStarted(); })
+ .tapError([this](Status s) {
+ if (_reserveKickOffCommitPromise()) {
+ _kickOffCommitPromise.setError(std::move(s));
+ }
});
- }
-}
-TransactionCoordinator::~TransactionCoordinator() {
- cancelIfCommitNotYetStarted();
+ // Two-phase commit phases chain. Once this chain executes, the 2PC sequence has completed
+ // either with success or error and the scheduled deadline task above has been joined.
+ std::move(kickOffCommitPF.future)
+ .then([this] {
+ // Persist the participants, unless they have been made durable already (which would
+ // only be the case if this coordinator was created as part of step-up recovery).
+ // Input: _participants
+ // _participantsDurable (optional)
+ // Output: _participantsDurable = true
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_participants);
+ if (_participantsDurable)
+ return Future<void>::makeReady();
+ }
- // Wait for all scheduled asynchronous activity to complete
- if (_deadlineScheduler)
- _deadlineScheduler->join();
+ return txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, *_participants)
+ .then([this] {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _participantsDurable = true;
+ });
+ })
+ .then([this] {
+ // Send prepare to the participants, unless this has already been done (which would only
+ // be the case if this coordinator was created as part of step-up recovery and the
+ // recovery document contained a decision).
+ // Input: _participants, _participantsDurable
+ // _decision (optional)
+ // Output: _decision is set
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_participantsDurable);
+ if (_decision)
+ return Future<void>::makeReady();
+ }
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_state == TransactionCoordinator::CoordinatorState::kDone);
+ return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, *_participants)
+ .then([this](PrepareVoteConsensus consensus) mutable {
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _decision = consensus.decision();
+ }
+
+ if (_decision->getDecision() == CommitDecision::kCommit) {
+ LOG(3) << "Advancing cluster time to the commit timestamp "
+ << *_decision->getCommitTimestamp() << " for " << _lsid.getId()
+ << ':' << _txnNumber;
+
+ uassertStatusOK(LogicalClock::get(_serviceContext)
+ ->advanceClusterTime(
+ LogicalTime(*_decision->getCommitTimestamp())));
+ }
+ });
+ })
+ .then([this] {
+ // Persist the commit decision, unless this has already been done (which would only be
+ // the case if this coordinator was created as part of step-up recovery and the recovery
+ // document contained a decision).
+ // Input: _decision
+ // _decisionDurable (optional)
+ // Output: _decisionDurable = true
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_decision);
+ if (_decisionDurable)
+ return Future<void>::makeReady();
+ }
- // Make sure no callers of functions on the coordinator are waiting for a decision to be
- // signaled or the commit process to complete.
- invariant(_completionPromises.empty());
-}
+ return txn::persistDecision(*_scheduler,
+ _lsid,
+ _txnNumber,
+ *_participants,
+ _decision->getCommitTimestamp())
+ .then([this] {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _decisionDurable = true;
+ });
+ })
+ .then([this] {
+ // Send the commit/abort decision to the participants.
+ // Input: _decisionDurable
+ // Output: (none)
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_decisionDurable);
+ }
-void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) {
- {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (_state != CoordinatorState::kInit)
- return;
- _state = CoordinatorState::kPreparing;
- }
+ _decisionPromise.emplaceValue(_decision->getDecision());
+
+ switch (_decision->getDecision()) {
+ case CommitDecision::kCommit:
+ return txn::sendCommit(_serviceContext,
+ *_scheduler,
+ _lsid,
+ _txnNumber,
+ *_participants,
+ *_decision->getCommitTimestamp());
+ case CommitDecision::kAbort:
+ return txn::sendAbort(
+ _serviceContext, *_scheduler, _lsid, _txnNumber, *_participants);
+ default:
+ MONGO_UNREACHABLE;
+ };
+ })
+ .onCompletion([this](Status s) {
+ // Do a best-effort attempt to delete the coordinator document from disk, regardless of
+ // the success of the commit sequence.
+ LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber;
+
+ if (MONGO_FAIL_POINT(doNotForgetCoordinator))
+ return Future<void>::makeReady(s);
- _cancelTimeoutWaitForCommitTask();
+ return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber)
+ .onCompletion([ this, chainStatus = std::move(s) ](Status deleteDocStatus) {
+ if (_participantsDurable) {
+ LOG(0) << redact(deleteDocStatus);
+ }
- txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, participantShards)
- .then([this, participantShards]() { return _runPhaseOne(participantShards); })
- .then([this, participantShards](CoordinatorCommitDecision decision) {
- return _runPhaseTwo(participantShards, decision);
+ return chainStatus;
+ });
})
- .getAsync([this](Status s) { _handleCompletionError(s); });
-}
-
-void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument& doc) {
- {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_state == CoordinatorState::kInit);
- invariant(!_deadlineScheduler);
- _state = CoordinatorState::kPreparing;
- }
+ .onCompletion([ this, deadlineFuture = std::move(deadlineFuture) ](Status s) mutable {
+ // Interrupt this coordinator's scheduler hierarchy and join the deadline task's future
+ // in order to guarantee that there are no more threads running within the coordinator.
+ _scheduler->shutdown(
+ {ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled, "Coordinator completed"});
- const auto& participantShards = doc.getParticipants();
-
- // Helper lambda to get the decision either from the document passed in or from the participants
- // (by performing 'phase one' of two-phase commit).
- auto getDecision = [&]() -> Future<CoordinatorCommitDecision> {
- const auto& decision = doc.getDecision();
- if (!decision) {
- return _runPhaseOne(participantShards);
- } else {
- return *decision;
- }
- };
-
- getDecision()
- .then([this, participantShards](CoordinatorCommitDecision decision) {
- return _runPhaseTwo(participantShards, decision);
+ return std::move(deadlineFuture).onCompletion([s = std::move(s)](Status) { return s; });
})
- .getAsync([this](Status s) { _handleCompletionError(s); });
+ .getAsync([this](Status s) {
+ // Notify all the listeners which are interested in the coordinator's lifecycle. After
+ // this call, the coordinator object could potentially get destroyed by its lifetime
+ // controller, so there shouldn't be any accesses to `this` after this call.
+ _done(s);
+ });
}
-Future<void> TransactionCoordinator::onCompletion() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_state == CoordinatorState::kDone) {
- return Future<void>::makeReady();
- }
-
- auto completionPromiseFuture = makePromiseFuture<void>();
- _completionPromises.emplace_back(std::move(completionPromiseFuture.promise));
-
- return std::move(completionPromiseFuture.future)
- .onError<ErrorCodes::TransactionCoordinatorSteppingDown>(
- [](const Status& s) { uasserted(ErrorCodes::InterruptedDueToStepDown, s.reason()); });
+TransactionCoordinator::~TransactionCoordinator() {
+ invariant(_completionPromises.empty());
}
-SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _decisionPromise.getFuture();
+void TransactionCoordinator::runCommit(std::vector<ShardId> participants) {
+ if (!_reserveKickOffCommitPromise())
+ return;
+
+ _participants = std::move(participants);
+ _kickOffCommitPromise.emplaceValue();
}
-void TransactionCoordinator::cancelIfCommitNotYetStarted() {
- _cancelTimeoutWaitForCommitTask();
+void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument& doc) {
+ if (!_reserveKickOffCommitPromise())
+ return;
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_state == CoordinatorState::kInit) {
- invariant(!_decisionPromise.getFuture().isReady());
- _decisionPromise.emplaceValue(CommitDecision::kCanceled);
- _transitionToDone(std::move(lk));
+ _participants = std::move(doc.getParticipants());
+ if (doc.getDecision()) {
+ _participantsDurable = true;
+ _decision = std::move(doc.getDecision());
}
-}
-void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() {
- if (_deadlineScheduler) {
- _deadlineScheduler->shutdown({ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled,
- "Interrupting the commit received deadline task"});
- }
+ _kickOffCommitPromise.emplaceValue();
}
-Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne(
- const std::vector<ShardId>& participantShards) {
- return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, participantShards)
- .then([this, participantShards](PrepareVoteConsensus result) {
- invariant(_state == CoordinatorState::kPreparing);
-
- auto decision = result.decision();
- if (decision.getDecision() == CommitDecision::kCommit) {
- LOG(3) << "Advancing cluster time to the commit timestamp "
- << *decision.getCommitTimestamp() << " for " << _lsid.getId() << ':'
- << _txnNumber;
-
- uassertStatusOK(
- LogicalClock::get(_serviceContext)
- ->advanceClusterTime(LogicalTime(*decision.getCommitTimestamp())));
- }
-
- return txn::persistDecision(*_scheduler,
- _lsid,
- _txnNumber,
- participantShards,
- decision.getCommitTimestamp())
- .then([decision] { return decision; });
- });
+SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() {
+ return _decisionPromise.getFuture();
}
-Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& participantShards,
- const CoordinatorCommitDecision& decision) {
- return _sendDecisionToParticipants(participantShards, decision)
- .then([this] {
- if (MONGO_FAIL_POINT(doNotForgetCoordinator))
- return Future<void>::makeReady();
+Future<void> TransactionCoordinator::onCompletion() {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (_completionPromisesFired)
+ return Future<void>::makeReady();
- return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber);
- })
- .then([this] {
- LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber;
+ auto completionPF = makePromiseFuture<void>();
+ _completionPromises.emplace_back(std::move(completionPF.promise));
- stdx::unique_lock<stdx::mutex> ul(_mutex);
- _transitionToDone(std::move(ul));
- });
+ return std::move(completionPF.future);
}
-Future<void> TransactionCoordinator::_sendDecisionToParticipants(
- const std::vector<ShardId>& participantShards, CoordinatorCommitDecision decision) {
- invariant(_state == CoordinatorState::kPreparing);
- _decisionPromise.emplaceValue(decision.getDecision());
-
- switch (decision.getDecision()) {
- case CommitDecision::kCommit:
- _state = CoordinatorState::kCommitting;
- return txn::sendCommit(_serviceContext,
- *_scheduler,
- _lsid,
- _txnNumber,
- participantShards,
- *decision.getCommitTimestamp());
- case CommitDecision::kAbort:
- _state = CoordinatorState::kAborting;
- return txn::sendAbort(
- _serviceContext, *_scheduler, _lsid, _txnNumber, participantShards);
- case CommitDecision::kCanceled:
- MONGO_UNREACHABLE;
- };
- MONGO_UNREACHABLE;
-};
-
-void TransactionCoordinator::_handleCompletionError(Status s) {
- if (s.isOK()) {
+void TransactionCoordinator::cancelIfCommitNotYetStarted() {
+ if (!_reserveKickOffCommitPromise())
return;
- }
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- LOG(3) << "Two-phase commit failed with error in state " << _state << " for " << _lsid.getId()
- << ':' << _txnNumber << causedBy(s);
-
- // If an error occurred prior to making a decision, set an error on the decision promise to
- // propagate it to callers of runCommit
- if (!_decisionPromise.getFuture().isReady()) {
- invariant(_state == CoordinatorState::kPreparing);
-
- // TransactionCoordinatorSteppingDown indicates the *sending* node (that is, *this* node) is
- // stepping down. Active coordinator tasks are interrupted with this code instead of
- // InterruptedDueToStepDown, because InterruptedDueToStepDown indicates the *receiving*
- // node was stepping down.
- if (s == ErrorCodes::TransactionCoordinatorSteppingDown) {
- s = Status(ErrorCodes::InterruptedDueToStepDown,
- str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber
- << " stopping due to: "
- << s.reason());
- }
+ _kickOffCommitPromise.setError({ErrorCodes::NoSuchTransaction,
+ "Transaction exceeded deadline or newer transaction started"});
+}
- _decisionPromise.setError(s);
- }
+bool TransactionCoordinator::_reserveKickOffCommitPromise() {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (_kickOffCommitPromiseSet)
+ return false;
- _transitionToDone(std::move(lk));
+ _kickOffCommitPromiseSet = true;
+ return true;
}
-void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept {
- _state = CoordinatorState::kDone;
+void TransactionCoordinator::_done(Status status) {
+ // TransactionCoordinatorSteppingDown indicates the *sending* node (that is, *this* node) is
+ // stepping down. Active coordinator tasks are interrupted with this code instead of
+ // InterruptedDueToStepDown, because InterruptedDueToStepDown indicates the *receiving* node was
+ // stepping down.
+ if (status == ErrorCodes::TransactionCoordinatorSteppingDown)
+ status = Status(ErrorCodes::InterruptedDueToStepDown,
+ str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber
+ << " stopped due to: "
+ << status.reason());
+
+ LOG(3) << "Two-phase commit for " << _lsid.getId() << ':' << _txnNumber << " completed with "
+ << redact(status);
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ _completionPromisesFired = true;
+
+ if (!_decisionDurable) {
+ ul.unlock();
+ _decisionPromise.setError(status);
+ ul.lock();
+ }
+ // Trigger the onCompletion promises outside of a lock, because the future handlers indicate to
+ // the potential lifetime controller that the object can be destroyed
auto promisesToTrigger = std::move(_completionPromises);
- lk.unlock();
+ ul.unlock();
- // No fields from 'this' are allowed to be accessed after the for loop below runs, because the
- // future handlers indicate to the potential lifetime controller that the object can be
- // destroyed
for (auto&& promise : promisesToTrigger) {
promise.emplaceValue();
}
}
-logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const TransactionCoordinator::CoordinatorState& state) {
- using State = TransactionCoordinator::CoordinatorState;
- // clang-format off
- switch (state) {
- case State::kInit: stream.stream() << "kInit"; break;
- case State::kPreparing: stream.stream() << "kPreparing"; break;
- case State::kAborting: stream.stream() << "kAborting"; break;
- case State::kCommitting: stream.stream() << "kCommitting"; break;
- case State::kDone: stream.stream() << "kDone"; break;
- };
- // clang-format on
- return stream;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h
index 4e7a0a80351..db5c29502cd 100644
--- a/src/mongo/db/s/transaction_coordinator.h
+++ b/src/mongo/db/s/transaction_coordinator.h
@@ -32,7 +32,6 @@
#include <vector>
#include "mongo/db/s/transaction_coordinator_util.h"
-#include "mongo/logger/logstream_builder.h"
#include "mongo/util/fail_point_service.h"
namespace mongo {
@@ -62,27 +61,11 @@ public:
const LogicalSessionId& lsid,
TxnNumber txnNumber,
std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
- boost::optional<Date_t> coordinateCommitDeadline);
+ Date_t deadline);
~TransactionCoordinator();
/**
- * The state of the coordinator.
- */
- enum class CoordinatorState {
- // The initial state prior to receiving the participant list from the router.
- kInit,
- // The coordinator is sending prepare and processing responses.
- kPreparing,
- // The coordinator is sending commit messages and waiting for responses.
- kCommitting,
- // The coordinator is sending abort messages and waiting for responses.
- kAborting,
- // The coordinator has received commit/abort acknowledgements from all participants.
- kDone,
- };
-
- /**
* The first time this is called, it asynchronously begins the two-phase commit process for the
* transaction that this coordinator is responsible for.
*
@@ -121,55 +104,12 @@ public:
void cancelIfCommitNotYetStarted();
private:
- /**
- * Called when the timeout task scheduled by the constructor is no longer necessary (i.e.
- * coordinateCommit was received or the coordinator was cancelled intentionally).
- */
- void _cancelTimeoutWaitForCommitTask();
-
- /**
- * Expects the participant list to already be majority-committed.
- *
- * 1. Sends prepare and collect the votes (i.e., responses), retrying requests as needed.
- * 2. Based on the votes, makes a commit or abort decision.
- * 3. If the decision is to commit, calculates the commit Timestamp.
- * 4. Writes the decision and waits for the decision to become majority-committed.
- */
- Future<txn::CoordinatorCommitDecision> _runPhaseOne(
- const std::vector<ShardId>& participantShards);
-
- /**
- * Expects the decision to already be majority-committed.
- *
- * 1. Send the decision (commit or abort) until receiving all acks (i.e., responses),
- * retrying requests as needed.
- * 2. Delete the coordinator's durable state without waiting for the delete to become
- * majority-committed.
- */
- Future<void> _runPhaseTwo(const std::vector<ShardId>& participantShards,
- const txn::CoordinatorCommitDecision& decision);
-
- /**
- * Asynchronously sends the commit decision to all participants (commit or abort), resolving the
- * returned future when all participants have acknowledged the decision.
- */
- Future<void> _sendDecisionToParticipants(const std::vector<ShardId>& participantShards,
- txn::CoordinatorCommitDecision coordinatorDecision);
+ bool _reserveKickOffCommitPromise();
/**
* Helper for handling errors that occur during either phase of commit coordination.
*/
- void _handleCompletionError(Status s);
-
- /**
- * Notifies all callers of onCompletion that the commit process has completed by fulfilling
- * their promises, and transitions the state to done.
- *
- * NOTE: Unlocks the lock passed in in order to fulfill the promises.
- *
- * TODO (SERVER-38346): Used SharedSemiFuture to simplify this implementation.
- */
- void _transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept;
+ void _done(Status s);
// Shortcut to the service context under which this coordinator runs
ServiceContext* const _serviceContext;
@@ -178,31 +118,40 @@ private:
const LogicalSessionId _lsid;
const TxnNumber _txnNumber;
- // Scheduler to use for all asynchronous activity which needs to be performed by this
- // coordinator
+ // Scheduler and context wrapping all asynchronous work dispatched by this coordinator
std::unique_ptr<txn::AsyncWorkScheduler> _scheduler;
- // If not nullptr, references the scheduler containing a task which willl trigger
- // 'cancelIfCommitNotYetStarted' if runCommit has not been invoked. If nullptr, then this
- // coordinator was not created with an expiration task.
- std::unique_ptr<txn::AsyncWorkScheduler> _deadlineScheduler;
-
// Protects the state below
mutable stdx::mutex _mutex;
- // Stores the current state of the coordinator in the commit process.
- CoordinatorState _state{CoordinatorState::kInit};
+ // Promise/future pair which will be signaled when the coordinator has completed
+ bool _kickOffCommitPromiseSet{false};
+ Promise<void> _kickOffCommitPromise;
+
+ // The state below gets populated sequentially as the coordinator advances through the 2 phase
+ // commit stages. Each of these fields is set only once for the lifetime of a coordinator and
+ // after that never changes.
+ //
+ // If the coordinator is canceled before commit is requested, none of these fiends will be set
+
+ // Set when the coordinator has been asked to coordinate commit
+ boost::optional<txn::ParticipantsList> _participants;
+ bool _participantsDurable{false};
+
+ // Set when the coordinator has heard back from all the participants and reached a decision, but
+ // hasn't yet persisted it
+ boost::optional<txn::CoordinatorCommitDecision> _decision;
- // Promise which will contain the final decision made by the coordinator (whether to commit or
- // abort). This is only known once all responses to prepare have been received from all
- // participants, and the collective decision has been majority persisted to
- // config.transactionCommitDecisions.
+ // Set when the coordinator has durably persisted `_decision` to the `config.coordinators`
+ // collection
+ bool _decisionDurable{false};
SharedPromise<txn::CommitDecision> _decisionPromise;
// A list of all promises corresponding to futures that were returned to callers of
// onCompletion.
//
// TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations.
+ bool _completionPromisesFired{false};
std::vector<Promise<void>> _completionPromises;
};
@@ -210,7 +159,4 @@ private:
// a local participant
MONGO_FAIL_POINT_DECLARE(doNotForgetCoordinator);
-logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
- const TransactionCoordinator::CoordinatorState& state);
-
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp
index 069fe02fb47..5ff64bfe41d 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp
@@ -86,7 +86,7 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
LOG(3) << "Inserting coordinator " << lsid.getId() << ':' << txnNumber
<< " into in-memory catalog";
- auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(); });
+ ON_BLOCK_EXIT([&] { _cleanupCompletedCoordinators(); });
stdx::unique_lock<stdx::mutex> ul(_mutex);
if (!forStepUp) {
diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
index db11b3aff10..c9a516eabe9 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp
@@ -61,7 +61,7 @@ protected:
lsid,
txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
_coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator);
}
@@ -140,7 +140,7 @@ TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalo
auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumber);
coordinator->cancelIfCommitNotYetStarted();
- ASSERT(coordinator->onCompletion().isReady());
+ coordinator->onCompletion().wait();
auto latestTxnNumAndCoordinator =
_coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
index 771815d0ab1..a616ee7b8cd 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp
@@ -58,7 +58,10 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext)
_executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {}
AsyncWorkScheduler::~AsyncWorkScheduler() {
- join();
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_quiesced(lg));
+ }
if (!_parent)
return;
@@ -218,8 +221,12 @@ Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId,
});
}
-void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock) {
- if (_activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty())
+bool AsyncWorkScheduler::_quiesced(WithLock) const {
+ return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty();
+}
+
+void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock wl) {
+ if (_quiesced(wl))
_allListsEmptyCV.notify_all();
}
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h
index 1b403d4704d..c378801ea4c 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util.h
+++ b/src/mongo/db/s/transaction_coordinator_futures_util.h
@@ -164,6 +164,11 @@ private:
const ReadPreferenceSetting& readPref);
/**
+ * Returns true when all the registered child schedulers, op contexts and handles have joined.
+ */
+ bool _quiesced(WithLock) const;
+
+ /**
* Invoked every time a registered op context, handle or child scheduler is getting
* unregistered. Used to notify the 'all lists empty' condition variable when there is no more
* activity on a scheduler which has been shut down.
diff --git a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp
index 2800b333b71..4948c57d260 100644
--- a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp
@@ -618,27 +618,6 @@ TEST_F(AsyncWorkSchedulerTest, DestroyingSchedulerCapturedInFutureCallback) {
future.get();
}
-TEST_F(AsyncWorkSchedulerTest, DestroyingSchedulerWithoutCallingShutdown) {
- boost::optional<AsyncWorkScheduler> async;
- async.emplace(getServiceContext());
-
- // This test is not 100% deterministic in that it would exercise the condition, but the idea is
- // to occasionally test a race in the destructor of AsyncWorkScheduler where shutdown has not
- // been called, but the scheduler is destroyed before the last scheduled task has been
- // unregistered, therefore causing a wait on the 'all work completed' condition variable.
- Barrier barrier(3);
- async->scheduleWork([&barrier](OperationContext* opCtx) { barrier.countDownAndWait(); })
- .getAsync([](Status) {});
-
- auto f = stdx::async(stdx::launch::async, [&async, &barrier] {
- barrier.countDownAndWait();
- async.reset();
- });
-
- barrier.countDownAndWait();
- f.get();
-}
-
using DoWhileTest = AsyncWorkSchedulerTest;
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index 36790242e7c..04d30f6a2d1 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
+#include "mongo/db/transaction_participant_gen.h"
#include "mongo/db/write_concern.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -164,6 +165,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
<< " transactions";
+ auto clockSource = opCtx->getServiceContext()->getFastClockSource();
auto& catalog = catalogAndScheduler->catalog;
auto& scheduler = catalogAndScheduler->scheduler;
@@ -173,12 +175,12 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
const auto lsid = *doc.getId().getSessionId();
const auto txnNumber = *doc.getId().getTxnNumber();
- auto coordinator =
- std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
- lsid,
- txnNumber,
- scheduler.makeChildScheduler(),
- boost::none /* No deadline */);
+ auto coordinator = std::make_shared<TransactionCoordinator>(
+ opCtx->getServiceContext(),
+ lsid,
+ txnNumber,
+ scheduler.makeChildScheduler(),
+ clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load()));
catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
coordinator->continueCommit(doc);
diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp
index 753a7d7b7c6..7206cfb6be9 100644
--- a/src/mongo/db/s/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp
@@ -471,8 +471,8 @@ TEST_F(TransactionCoordinatorServiceTest,
coordinatorService->createCoordinator(
operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
- ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
- static_cast<int>(txn::CommitDecision::kCanceled));
+ ASSERT_THROWS_CODE(
+ commitDecisionFuture.get(), AssertionException, ErrorCodes::NoSuchTransaction);
}
TEST_F(
@@ -493,7 +493,10 @@ TEST_F(
coordinatorService->coordinateCommit(operationContext(), _lsid, _txnNumber, kTwoShardIdSet);
// The old transaction should now be committed.
- ASSERT(oldTxnCommitDecisionFuture == boost::none);
+ if (oldTxnCommitDecisionFuture) {
+ ASSERT_THROWS_CODE(
+ oldTxnCommitDecisionFuture->get(), AssertionException, ErrorCodes::NoSuchTransaction);
+ }
// Make sure the newly created one works fine too.
commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet);
diff --git a/src/mongo/db/s/transaction_coordinator_structures.cpp b/src/mongo/db/s/transaction_coordinator_structures.cpp
index 08be1a0b7aa..2088ff6d5c7 100644
--- a/src/mongo/db/s/transaction_coordinator_structures.cpp
+++ b/src/mongo/db/s/transaction_coordinator_structures.cpp
@@ -59,7 +59,6 @@ StringData writeCommitDecisionEnumProperty(CommitDecision decision) {
switch (decision) {
case CommitDecision::kCommit: return kCommitDecision;
case CommitDecision::kAbort: return kAbortDecision;
- case CommitDecision::kCanceled: break;
};
// clang-format on
MONGO_UNREACHABLE;
@@ -71,7 +70,6 @@ logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
switch (decision) {
case txn::CommitDecision::kCommit: return stream << "kCommit";
case txn::CommitDecision::kAbort: return stream << "kAbort";
- case txn::CommitDecision::kCanceled: return stream << "kCanceled";
};
// clang-format on
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/s/transaction_coordinator_structures.h b/src/mongo/db/s/transaction_coordinator_structures.h
index bf2c52e2985..76f7890990d 100644
--- a/src/mongo/db/s/transaction_coordinator_structures.h
+++ b/src/mongo/db/s/transaction_coordinator_structures.h
@@ -44,7 +44,6 @@ using ParticipantsList = std::vector<ShardId>;
enum class PrepareVote {
kCommit,
kAbort,
- kCanceled,
};
using CommitDecision = PrepareVote;
diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp
index 9cdd0869045..e1d8c9367c3 100644
--- a/src/mongo/db/s/transaction_coordinator_test.cpp
+++ b/src/mongo/db/s/transaction_coordinator_test.cpp
@@ -665,7 +665,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -687,7 +687,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -709,7 +709,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbor
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -731,7 +731,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -754,7 +754,7 @@ TEST_F(TransactionCoordinatorTest,
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -782,7 +782,7 @@ TEST_F(TransactionCoordinatorTest,
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -807,7 +807,7 @@ TEST_F(TransactionCoordinatorTest,
_lsid,
_txnNumber,
std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- boost::none);
+ Date_t::max());
coordinator.runCommit(kTwoShardIdList);
auto commitDecisionFuture = coordinator.getDecision();
@@ -832,14 +832,5 @@ TEST_F(TransactionCoordinatorTest,
coordinator.onCompletion().get();
}
-TEST_F(TransactionCoordinatorTest, AbandonNewlyCreatedCoordinator) {
- TransactionCoordinator coordinator(
- getServiceContext(),
- _lsid,
- _txnNumber,
- std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()),
- network()->now() + Seconds{30});
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h
index 583350a6d0e..3eb4f6a3ed1 100644
--- a/src/mongo/db/s/transaction_coordinator_util.h
+++ b/src/mongo/db/s/transaction_coordinator_util.h
@@ -57,11 +57,6 @@ Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler,
TxnNumber txnNumber,
const txn::ParticipantsList& participants);
-/**
- * Sends prepare to all participants and returns a future that will be resolved when either:
- * a) All participants have responded with a vote to commit, or
- * b) At least one participant votes to abort.
- */
struct PrepareResponse;
class PrepareVoteConsensus {
public:
@@ -85,6 +80,12 @@ private:
Timestamp _maxPrepareTimestamp;
};
+
+/**
+ * Sends prepare to all participants and returns a future that will be resolved when either:
+ * a) All participants have responded with a vote to commit, or
+ * b) At least one participant votes to abort.
+ */
Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
const LogicalSessionId& lsid,
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index 764704b0744..5408fc76516 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -171,6 +171,30 @@ public:
} prepareTransactionCmd;
+std::set<ShardId> validateParticipants(OperationContext* opCtx,
+ const std::vector<mongo::CommitParticipant>& participants) {
+ StringBuilder ss;
+ std::set<ShardId> participantsSet;
+
+ ss << '[';
+ for (const auto& participant : participants) {
+ const auto& shardId = participant.getShardId();
+ const bool inserted = participantsSet.emplace(shardId).second;
+ uassert(51162,
+ str::stream() << "Participant list contains duplicate shard " << shardId,
+ inserted);
+ ss << shardId << ", ";
+ }
+ ss << ']';
+
+ LOG(3) << "Coordinator shard received request to coordinate commit with "
+ "participant list "
+ << ss.str() << " for " << opCtx->getLogicalSessionId()->getId() << ':'
+ << opCtx->getTxnNumber();
+
+ return participantsSet;
+}
+
class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> {
public:
using Request = CoordinateCommitTransaction;
@@ -197,31 +221,11 @@ public:
boost::optional<Future<txn::CommitDecision>> commitDecisionFuture;
if (!cmd.getParticipants().empty()) {
- // Convert the participant list array into a set, and assert that all participants
- // in the list are unique.
- // TODO (PM-564): Propagate the 'readOnly' flag down into the
- // TransactionCoordinator.
- std::set<ShardId> participantList;
- StringBuilder ss;
- ss << "[";
- for (const auto& participant : cmd.getParticipants()) {
- const auto& shardId = participant.getShardId();
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "Participant list contained duplicate shardId "
- << shardId,
- std::find(participantList.begin(), participantList.end(), shardId) ==
- participantList.end());
- participantList.insert(shardId);
- ss << shardId << " ";
- }
- ss << "]";
- LOG(3) << "Coordinator shard received request to coordinate commit with "
- "participant list "
- << ss.str() << " for " << opCtx->getLogicalSessionId()->getId() << ':'
- << opCtx->getTxnNumber();
-
- commitDecisionFuture = tcs->coordinateCommit(
- opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), participantList);
+ commitDecisionFuture =
+ tcs->coordinateCommit(opCtx,
+ *opCtx->getLogicalSessionId(),
+ *opCtx->getTxnNumber(),
+ validateParticipants(opCtx, cmd.getParticipants()));
} else {
LOG(3) << "Coordinator shard received request to recover commit decision for "
<< opCtx->getLogicalSessionId()->getId() << ':' << opCtx->getTxnNumber();
@@ -239,16 +243,26 @@ public:
});
if (commitDecisionFuture) {
- // The commit coordination is still ongoing. Block waiting for the decision.
- auto commitDecision = commitDecisionFuture->get(opCtx);
- switch (commitDecision) {
- case txn::CommitDecision::kCanceled:
- // Continue on to recover the commit decision from disk.
- break;
- case txn::CommitDecision::kAbort:
- uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted");
- case txn::CommitDecision::kCommit:
- return;
+ auto swCommitDecision = commitDecisionFuture->getNoThrow(opCtx);
+ // The coordinator can only return NoSuchTransaction if cancelIfCommitNotYetStarted
+ // was called, which can happen in one of 3 cases:
+ // 1) The deadline to receive coordinateCommit passed
+ // 2) Transaction with a newer txnNumber started on the session before
+ // coordinateCommit was received
+ // 3) This is a sharded transaction, which used the optimized commit path and
+ // didn't require 2PC
+ //
+ // Even though only (3) requires recovering the commit decision from the local
+ // participant, since these cases cannot be differentiated currently, we always
+ // recover from the local participant.
+ if (swCommitDecision != ErrorCodes::NoSuchTransaction) {
+ auto commitDecision = uassertStatusOK(std::move(swCommitDecision));
+ switch (commitDecision) {
+ case txn::CommitDecision::kCommit:
+ return;
+ case txn::CommitDecision::kAbort:
+ uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted");
+ }
}
}
@@ -267,22 +281,24 @@ public:
// NoSuchTransaction and the client sent a non-default writeConcern, the
// coordinateCommitTransaction command's post-amble will do a no-op write and wait for
// the client's writeConcern.
- BSONObj abortRequestObj =
- BSON("abortTransaction" << 1 << "lsid" << opCtx->getLogicalSessionId()->toBSON()
- << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false);
+ AbortTransaction abortTransaction;
+ abortTransaction.setDbName(NamespaceString::kAdminDb);
+ auto abortObj = abortTransaction.toBSON(
+ BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
+ << *opCtx->getTxnNumber()
+ << "autocommit"
+ << false));
const auto abortStatus = [&] {
txn::AsyncWorkScheduler aws(opCtx->getServiceContext());
- auto awsShutdownGuard = makeGuard([&aws] {
- aws.shutdown({ErrorCodes::Interrupted, "Request interrupted due to timeout"});
- });
auto future =
aws.scheduleRemoteCommand(txn::getLocalShardId(opCtx->getServiceContext()),
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- abortRequestObj);
+ abortObj);
+ ON_BLOCK_EXIT([&] {
+ aws.shutdown({ErrorCodes::Interrupted, "Request interrupted due to timeout"});
+ future.wait();
+ });
const auto& responseStatus = future.get(opCtx);
uassertStatusOK(responseStatus.status);
@@ -290,7 +306,7 @@ public:
}();
LOG(3) << "coordinateCommitTransaction got response " << abortStatus << " for "
- << abortRequestObj << " used to recover decision from local participant";
+ << abortObj << " used to recover decision from local participant";
// If the abortTransaction succeeded, return that the transaction aborted.
if (abortStatus.isOK())