diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-04-01 17:22:06 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-04-03 16:48:29 -0400 |
commit | 7575dac4abe7d5c6164b54487a81771cd3aad644 (patch) | |
tree | e74ba01038c58c5c94e6b686e482e9a4fdc88463 /src/mongo | |
parent | c4d16e522c18e96114fd3e9b1b71ba835d11c8d7 (diff) | |
download | mongo-7575dac4abe7d5c6164b54487a81771cd3aad644.tar.gz |
SERVER-40297 Make the TransactionCoordinator entirely futures-based
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 398 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 104 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_futures_util_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_structures.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_structures.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 108 |
14 files changed, 333 insertions, 382 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 98df3191e08..01c96be89f2 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -50,247 +50,251 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, TxnNumber txnNumber, std::unique_ptr<txn::AsyncWorkScheduler> scheduler, - boost::optional<Date_t> coordinateCommitDeadline) + Date_t deadline) : _serviceContext(serviceContext), _lsid(lsid), _txnNumber(txnNumber), _scheduler(std::move(scheduler)) { - if (coordinateCommitDeadline) { - _deadlineScheduler = _scheduler->makeChildScheduler(); - _deadlineScheduler - ->scheduleWorkAt(*coordinateCommitDeadline, [](OperationContext* opCtx) {}) - .getAsync([this](const Status& s) { - if (s == ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled) - return; - cancelIfCommitNotYetStarted(); + + auto kickOffCommitPF = makePromiseFuture<void>(); + _kickOffCommitPromise = std::move(kickOffCommitPF.promise); + + // Task, which will fire when the transaction's total deadline has been reached. If the 2PC + // sequence has not yet started, it will be abandoned altogether. + auto deadlineFuture = + _scheduler + ->scheduleWorkAt(deadline, [this](OperationContext*) { cancelIfCommitNotYetStarted(); }) + .tapError([this](Status s) { + if (_reserveKickOffCommitPromise()) { + _kickOffCommitPromise.setError(std::move(s)); + } }); - } -} -TransactionCoordinator::~TransactionCoordinator() { - cancelIfCommitNotYetStarted(); + // Two-phase commit phases chain. Once this chain executes, the 2PC sequence has completed + // either with success or error and the scheduled deadline task above has been joined. + std::move(kickOffCommitPF.future) + .then([this] { + // Persist the participants, unless they have been made durable already (which would + // only be the case if this coordinator was created as part of step-up recovery). + // Input: _participants + // _participantsDurable (optional) + // Output: _participantsDurable = true + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_participants); + if (_participantsDurable) + return Future<void>::makeReady(); + } - // Wait for all scheduled asynchronous activity to complete - if (_deadlineScheduler) - _deadlineScheduler->join(); + return txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, *_participants) + .then([this] { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _participantsDurable = true; + }); + }) + .then([this] { + // Send prepare to the participants, unless this has already been done (which would only + // be the case if this coordinator was created as part of step-up recovery and the + // recovery document contained a decision). + // Input: _participants, _participantsDurable + // _decision (optional) + // Output: _decision is set + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_participantsDurable); + if (_decision) + return Future<void>::makeReady(); + } - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_state == TransactionCoordinator::CoordinatorState::kDone); + return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, *_participants) + .then([this](PrepareVoteConsensus consensus) mutable { + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _decision = consensus.decision(); + } + + if (_decision->getDecision() == CommitDecision::kCommit) { + LOG(3) << "Advancing cluster time to the commit timestamp " + << *_decision->getCommitTimestamp() << " for " << _lsid.getId() + << ':' << _txnNumber; + + uassertStatusOK(LogicalClock::get(_serviceContext) + ->advanceClusterTime( + LogicalTime(*_decision->getCommitTimestamp()))); + } + }); + }) + .then([this] { + // Persist the commit decision, unless this has already been done (which would only be + // the case if this coordinator was created as part of step-up recovery and the recovery + // document contained a decision). + // Input: _decision + // _decisionDurable (optional) + // Output: _decisionDurable = true + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_decision); + if (_decisionDurable) + return Future<void>::makeReady(); + } - // Make sure no callers of functions on the coordinator are waiting for a decision to be - // signaled or the commit process to complete. - invariant(_completionPromises.empty()); -} + return txn::persistDecision(*_scheduler, + _lsid, + _txnNumber, + *_participants, + _decision->getCommitTimestamp()) + .then([this] { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _decisionDurable = true; + }); + }) + .then([this] { + // Send the commit/abort decision to the participants. + // Input: _decisionDurable + // Output: (none) + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_decisionDurable); + } -void TransactionCoordinator::runCommit(std::vector<ShardId> participantShards) { - { - stdx::lock_guard<stdx::mutex> lg(_mutex); - if (_state != CoordinatorState::kInit) - return; - _state = CoordinatorState::kPreparing; - } + _decisionPromise.emplaceValue(_decision->getDecision()); + + switch (_decision->getDecision()) { + case CommitDecision::kCommit: + return txn::sendCommit(_serviceContext, + *_scheduler, + _lsid, + _txnNumber, + *_participants, + *_decision->getCommitTimestamp()); + case CommitDecision::kAbort: + return txn::sendAbort( + _serviceContext, *_scheduler, _lsid, _txnNumber, *_participants); + default: + MONGO_UNREACHABLE; + }; + }) + .onCompletion([this](Status s) { + // Do a best-effort attempt to delete the coordinator document from disk, regardless of + // the success of the commit sequence. + LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber; + + if (MONGO_FAIL_POINT(doNotForgetCoordinator)) + return Future<void>::makeReady(s); - _cancelTimeoutWaitForCommitTask(); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber) + .onCompletion([ this, chainStatus = std::move(s) ](Status deleteDocStatus) { + if (_participantsDurable) { + LOG(0) << redact(deleteDocStatus); + } - txn::persistParticipantsList(*_scheduler, _lsid, _txnNumber, participantShards) - .then([this, participantShards]() { return _runPhaseOne(participantShards); }) - .then([this, participantShards](CoordinatorCommitDecision decision) { - return _runPhaseTwo(participantShards, decision); + return chainStatus; + }); }) - .getAsync([this](Status s) { _handleCompletionError(s); }); -} - -void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument& doc) { - { - stdx::lock_guard<stdx::mutex> lg(_mutex); - invariant(_state == CoordinatorState::kInit); - invariant(!_deadlineScheduler); - _state = CoordinatorState::kPreparing; - } + .onCompletion([ this, deadlineFuture = std::move(deadlineFuture) ](Status s) mutable { + // Interrupt this coordinator's scheduler hierarchy and join the deadline task's future + // in order to guarantee that there are no more threads running within the coordinator. + _scheduler->shutdown( + {ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled, "Coordinator completed"}); - const auto& participantShards = doc.getParticipants(); - - // Helper lambda to get the decision either from the document passed in or from the participants - // (by performing 'phase one' of two-phase commit). - auto getDecision = [&]() -> Future<CoordinatorCommitDecision> { - const auto& decision = doc.getDecision(); - if (!decision) { - return _runPhaseOne(participantShards); - } else { - return *decision; - } - }; - - getDecision() - .then([this, participantShards](CoordinatorCommitDecision decision) { - return _runPhaseTwo(participantShards, decision); + return std::move(deadlineFuture).onCompletion([s = std::move(s)](Status) { return s; }); }) - .getAsync([this](Status s) { _handleCompletionError(s); }); + .getAsync([this](Status s) { + // Notify all the listeners which are interested in the coordinator's lifecycle. After + // this call, the coordinator object could potentially get destroyed by its lifetime + // controller, so there shouldn't be any accesses to `this` after this call. + _done(s); + }); } -Future<void> TransactionCoordinator::onCompletion() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_state == CoordinatorState::kDone) { - return Future<void>::makeReady(); - } - - auto completionPromiseFuture = makePromiseFuture<void>(); - _completionPromises.emplace_back(std::move(completionPromiseFuture.promise)); - - return std::move(completionPromiseFuture.future) - .onError<ErrorCodes::TransactionCoordinatorSteppingDown>( - [](const Status& s) { uasserted(ErrorCodes::InterruptedDueToStepDown, s.reason()); }); +TransactionCoordinator::~TransactionCoordinator() { + invariant(_completionPromises.empty()); } -SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() { - stdx::lock_guard<stdx::mutex> lg(_mutex); - return _decisionPromise.getFuture(); +void TransactionCoordinator::runCommit(std::vector<ShardId> participants) { + if (!_reserveKickOffCommitPromise()) + return; + + _participants = std::move(participants); + _kickOffCommitPromise.emplaceValue(); } -void TransactionCoordinator::cancelIfCommitNotYetStarted() { - _cancelTimeoutWaitForCommitTask(); +void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument& doc) { + if (!_reserveKickOffCommitPromise()) + return; - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_state == CoordinatorState::kInit) { - invariant(!_decisionPromise.getFuture().isReady()); - _decisionPromise.emplaceValue(CommitDecision::kCanceled); - _transitionToDone(std::move(lk)); + _participants = std::move(doc.getParticipants()); + if (doc.getDecision()) { + _participantsDurable = true; + _decision = std::move(doc.getDecision()); } -} -void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() { - if (_deadlineScheduler) { - _deadlineScheduler->shutdown({ErrorCodes::TransactionCoordinatorDeadlineTaskCanceled, - "Interrupting the commit received deadline task"}); - } + _kickOffCommitPromise.emplaceValue(); } -Future<CoordinatorCommitDecision> TransactionCoordinator::_runPhaseOne( - const std::vector<ShardId>& participantShards) { - return txn::sendPrepare(_serviceContext, *_scheduler, _lsid, _txnNumber, participantShards) - .then([this, participantShards](PrepareVoteConsensus result) { - invariant(_state == CoordinatorState::kPreparing); - - auto decision = result.decision(); - if (decision.getDecision() == CommitDecision::kCommit) { - LOG(3) << "Advancing cluster time to the commit timestamp " - << *decision.getCommitTimestamp() << " for " << _lsid.getId() << ':' - << _txnNumber; - - uassertStatusOK( - LogicalClock::get(_serviceContext) - ->advanceClusterTime(LogicalTime(*decision.getCommitTimestamp()))); - } - - return txn::persistDecision(*_scheduler, - _lsid, - _txnNumber, - participantShards, - decision.getCommitTimestamp()) - .then([decision] { return decision; }); - }); +SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() { + return _decisionPromise.getFuture(); } -Future<void> TransactionCoordinator::_runPhaseTwo(const std::vector<ShardId>& participantShards, - const CoordinatorCommitDecision& decision) { - return _sendDecisionToParticipants(participantShards, decision) - .then([this] { - if (MONGO_FAIL_POINT(doNotForgetCoordinator)) - return Future<void>::makeReady(); +Future<void> TransactionCoordinator::onCompletion() { + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (_completionPromisesFired) + return Future<void>::makeReady(); - return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); - }) - .then([this] { - LOG(3) << "Two-phase commit completed for " << _lsid.getId() << ':' << _txnNumber; + auto completionPF = makePromiseFuture<void>(); + _completionPromises.emplace_back(std::move(completionPF.promise)); - stdx::unique_lock<stdx::mutex> ul(_mutex); - _transitionToDone(std::move(ul)); - }); + return std::move(completionPF.future); } -Future<void> TransactionCoordinator::_sendDecisionToParticipants( - const std::vector<ShardId>& participantShards, CoordinatorCommitDecision decision) { - invariant(_state == CoordinatorState::kPreparing); - _decisionPromise.emplaceValue(decision.getDecision()); - - switch (decision.getDecision()) { - case CommitDecision::kCommit: - _state = CoordinatorState::kCommitting; - return txn::sendCommit(_serviceContext, - *_scheduler, - _lsid, - _txnNumber, - participantShards, - *decision.getCommitTimestamp()); - case CommitDecision::kAbort: - _state = CoordinatorState::kAborting; - return txn::sendAbort( - _serviceContext, *_scheduler, _lsid, _txnNumber, participantShards); - case CommitDecision::kCanceled: - MONGO_UNREACHABLE; - }; - MONGO_UNREACHABLE; -}; - -void TransactionCoordinator::_handleCompletionError(Status s) { - if (s.isOK()) { +void TransactionCoordinator::cancelIfCommitNotYetStarted() { + if (!_reserveKickOffCommitPromise()) return; - } - stdx::unique_lock<stdx::mutex> lk(_mutex); - - LOG(3) << "Two-phase commit failed with error in state " << _state << " for " << _lsid.getId() - << ':' << _txnNumber << causedBy(s); - - // If an error occurred prior to making a decision, set an error on the decision promise to - // propagate it to callers of runCommit - if (!_decisionPromise.getFuture().isReady()) { - invariant(_state == CoordinatorState::kPreparing); - - // TransactionCoordinatorSteppingDown indicates the *sending* node (that is, *this* node) is - // stepping down. Active coordinator tasks are interrupted with this code instead of - // InterruptedDueToStepDown, because InterruptedDueToStepDown indicates the *receiving* - // node was stepping down. - if (s == ErrorCodes::TransactionCoordinatorSteppingDown) { - s = Status(ErrorCodes::InterruptedDueToStepDown, - str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber - << " stopping due to: " - << s.reason()); - } + _kickOffCommitPromise.setError({ErrorCodes::NoSuchTransaction, + "Transaction exceeded deadline or newer transaction started"}); +} - _decisionPromise.setError(s); - } +bool TransactionCoordinator::_reserveKickOffCommitPromise() { + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (_kickOffCommitPromiseSet) + return false; - _transitionToDone(std::move(lk)); + _kickOffCommitPromiseSet = true; + return true; } -void TransactionCoordinator::_transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept { - _state = CoordinatorState::kDone; +void TransactionCoordinator::_done(Status status) { + // TransactionCoordinatorSteppingDown indicates the *sending* node (that is, *this* node) is + // stepping down. Active coordinator tasks are interrupted with this code instead of + // InterruptedDueToStepDown, because InterruptedDueToStepDown indicates the *receiving* node was + // stepping down. + if (status == ErrorCodes::TransactionCoordinatorSteppingDown) + status = Status(ErrorCodes::InterruptedDueToStepDown, + str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber + << " stopped due to: " + << status.reason()); + + LOG(3) << "Two-phase commit for " << _lsid.getId() << ':' << _txnNumber << " completed with " + << redact(status); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + _completionPromisesFired = true; + + if (!_decisionDurable) { + ul.unlock(); + _decisionPromise.setError(status); + ul.lock(); + } + // Trigger the onCompletion promises outside of a lock, because the future handlers indicate to + // the potential lifetime controller that the object can be destroyed auto promisesToTrigger = std::move(_completionPromises); - lk.unlock(); + ul.unlock(); - // No fields from 'this' are allowed to be accessed after the for loop below runs, because the - // future handlers indicate to the potential lifetime controller that the object can be - // destroyed for (auto&& promise : promisesToTrigger) { promise.emplaceValue(); } } -logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const TransactionCoordinator::CoordinatorState& state) { - using State = TransactionCoordinator::CoordinatorState; - // clang-format off - switch (state) { - case State::kInit: stream.stream() << "kInit"; break; - case State::kPreparing: stream.stream() << "kPreparing"; break; - case State::kAborting: stream.stream() << "kAborting"; break; - case State::kCommitting: stream.stream() << "kCommitting"; break; - case State::kDone: stream.stream() << "kDone"; break; - }; - // clang-format on - return stream; -} - } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index 4e7a0a80351..db5c29502cd 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -32,7 +32,6 @@ #include <vector> #include "mongo/db/s/transaction_coordinator_util.h" -#include "mongo/logger/logstream_builder.h" #include "mongo/util/fail_point_service.h" namespace mongo { @@ -62,27 +61,11 @@ public: const LogicalSessionId& lsid, TxnNumber txnNumber, std::unique_ptr<txn::AsyncWorkScheduler> scheduler, - boost::optional<Date_t> coordinateCommitDeadline); + Date_t deadline); ~TransactionCoordinator(); /** - * The state of the coordinator. - */ - enum class CoordinatorState { - // The initial state prior to receiving the participant list from the router. - kInit, - // The coordinator is sending prepare and processing responses. - kPreparing, - // The coordinator is sending commit messages and waiting for responses. - kCommitting, - // The coordinator is sending abort messages and waiting for responses. - kAborting, - // The coordinator has received commit/abort acknowledgements from all participants. - kDone, - }; - - /** * The first time this is called, it asynchronously begins the two-phase commit process for the * transaction that this coordinator is responsible for. * @@ -121,55 +104,12 @@ public: void cancelIfCommitNotYetStarted(); private: - /** - * Called when the timeout task scheduled by the constructor is no longer necessary (i.e. - * coordinateCommit was received or the coordinator was cancelled intentionally). - */ - void _cancelTimeoutWaitForCommitTask(); - - /** - * Expects the participant list to already be majority-committed. - * - * 1. Sends prepare and collect the votes (i.e., responses), retrying requests as needed. - * 2. Based on the votes, makes a commit or abort decision. - * 3. If the decision is to commit, calculates the commit Timestamp. - * 4. Writes the decision and waits for the decision to become majority-committed. - */ - Future<txn::CoordinatorCommitDecision> _runPhaseOne( - const std::vector<ShardId>& participantShards); - - /** - * Expects the decision to already be majority-committed. - * - * 1. Send the decision (commit or abort) until receiving all acks (i.e., responses), - * retrying requests as needed. - * 2. Delete the coordinator's durable state without waiting for the delete to become - * majority-committed. - */ - Future<void> _runPhaseTwo(const std::vector<ShardId>& participantShards, - const txn::CoordinatorCommitDecision& decision); - - /** - * Asynchronously sends the commit decision to all participants (commit or abort), resolving the - * returned future when all participants have acknowledged the decision. - */ - Future<void> _sendDecisionToParticipants(const std::vector<ShardId>& participantShards, - txn::CoordinatorCommitDecision coordinatorDecision); + bool _reserveKickOffCommitPromise(); /** * Helper for handling errors that occur during either phase of commit coordination. */ - void _handleCompletionError(Status s); - - /** - * Notifies all callers of onCompletion that the commit process has completed by fulfilling - * their promises, and transitions the state to done. - * - * NOTE: Unlocks the lock passed in in order to fulfill the promises. - * - * TODO (SERVER-38346): Used SharedSemiFuture to simplify this implementation. - */ - void _transitionToDone(stdx::unique_lock<stdx::mutex> lk) noexcept; + void _done(Status s); // Shortcut to the service context under which this coordinator runs ServiceContext* const _serviceContext; @@ -178,31 +118,40 @@ private: const LogicalSessionId _lsid; const TxnNumber _txnNumber; - // Scheduler to use for all asynchronous activity which needs to be performed by this - // coordinator + // Scheduler and context wrapping all asynchronous work dispatched by this coordinator std::unique_ptr<txn::AsyncWorkScheduler> _scheduler; - // If not nullptr, references the scheduler containing a task which willl trigger - // 'cancelIfCommitNotYetStarted' if runCommit has not been invoked. If nullptr, then this - // coordinator was not created with an expiration task. - std::unique_ptr<txn::AsyncWorkScheduler> _deadlineScheduler; - // Protects the state below mutable stdx::mutex _mutex; - // Stores the current state of the coordinator in the commit process. - CoordinatorState _state{CoordinatorState::kInit}; + // Promise/future pair which will be signaled when the coordinator has completed + bool _kickOffCommitPromiseSet{false}; + Promise<void> _kickOffCommitPromise; + + // The state below gets populated sequentially as the coordinator advances through the 2 phase + // commit stages. Each of these fields is set only once for the lifetime of a coordinator and + // after that never changes. + // + // If the coordinator is canceled before commit is requested, none of these fiends will be set + + // Set when the coordinator has been asked to coordinate commit + boost::optional<txn::ParticipantsList> _participants; + bool _participantsDurable{false}; + + // Set when the coordinator has heard back from all the participants and reached a decision, but + // hasn't yet persisted it + boost::optional<txn::CoordinatorCommitDecision> _decision; - // Promise which will contain the final decision made by the coordinator (whether to commit or - // abort). This is only known once all responses to prepare have been received from all - // participants, and the collective decision has been majority persisted to - // config.transactionCommitDecisions. + // Set when the coordinator has durably persisted `_decision` to the `config.coordinators` + // collection + bool _decisionDurable{false}; SharedPromise<txn::CommitDecision> _decisionPromise; // A list of all promises corresponding to futures that were returned to callers of // onCompletion. // // TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations. + bool _completionPromisesFired{false}; std::vector<Promise<void>> _completionPromises; }; @@ -210,7 +159,4 @@ private: // a local participant MONGO_FAIL_POINT_DECLARE(doNotForgetCoordinator); -logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const TransactionCoordinator::CoordinatorState& state); - } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index 069fe02fb47..5ff64bfe41d 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -86,7 +86,7 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, LOG(3) << "Inserting coordinator " << lsid.getId() << ':' << txnNumber << " into in-memory catalog"; - auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(); }); + ON_BLOCK_EXIT([&] { _cleanupCompletedCoordinators(); }); stdx::unique_lock<stdx::mutex> ul(_mutex); if (!forStepUp) { diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp index db11b3aff10..c9a516eabe9 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp @@ -61,7 +61,7 @@ protected: lsid, txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); _coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator); } @@ -140,7 +140,7 @@ TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalo auto coordinator = _coordinatorCatalog->get(operationContext(), lsid, txnNumber); coordinator->cancelIfCommitNotYetStarted(); - ASSERT(coordinator->onCompletion().isReady()); + coordinator->onCompletion().wait(); auto latestTxnNumAndCoordinator = _coordinatorCatalog->getLatestOnSession(operationContext(), lsid); diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index 771815d0ab1..a616ee7b8cd 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -58,7 +58,10 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) _executor(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor()) {} AsyncWorkScheduler::~AsyncWorkScheduler() { - join(); + { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_quiesced(lg)); + } if (!_parent) return; @@ -218,8 +221,12 @@ Future<HostAndPort> AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, }); } -void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock) { - if (_activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty()) +bool AsyncWorkScheduler::_quiesced(WithLock) const { + return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty(); +} + +void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock wl) { + if (_quiesced(wl)) _allListsEmptyCV.notify_all(); } diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index 1b403d4704d..c378801ea4c 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -164,6 +164,11 @@ private: const ReadPreferenceSetting& readPref); /** + * Returns true when all the registered child schedulers, op contexts and handles have joined. + */ + bool _quiesced(WithLock) const; + + /** * Invoked every time a registered op context, handle or child scheduler is getting * unregistered. Used to notify the 'all lists empty' condition variable when there is no more * activity on a scheduler which has been shut down. diff --git a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp index 2800b333b71..4948c57d260 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp @@ -618,27 +618,6 @@ TEST_F(AsyncWorkSchedulerTest, DestroyingSchedulerCapturedInFutureCallback) { future.get(); } -TEST_F(AsyncWorkSchedulerTest, DestroyingSchedulerWithoutCallingShutdown) { - boost::optional<AsyncWorkScheduler> async; - async.emplace(getServiceContext()); - - // This test is not 100% deterministic in that it would exercise the condition, but the idea is - // to occasionally test a race in the destructor of AsyncWorkScheduler where shutdown has not - // been called, but the scheduler is destroyed before the last scheduled task has been - // unregistered, therefore causing a wait on the 'all work completed' condition variable. - Barrier barrier(3); - async->scheduleWork([&barrier](OperationContext* opCtx) { barrier.countDownAndWait(); }) - .getAsync([](Status) {}); - - auto f = stdx::async(stdx::launch::async, [&async, &barrier] { - barrier.countDownAndWait(); - async.reset(); - }); - - barrier.countDownAndWait(); - f.get(); -} - using DoWhileTest = AsyncWorkSchedulerTest; diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 36790242e7c..04d30f6a2d1 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -35,6 +35,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" +#include "mongo/db/transaction_participant_gen.h" #include "mongo/db/write_concern.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -164,6 +165,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() << " transactions"; + auto clockSource = opCtx->getServiceContext()->getFastClockSource(); auto& catalog = catalogAndScheduler->catalog; auto& scheduler = catalogAndScheduler->scheduler; @@ -173,12 +175,12 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, const auto lsid = *doc.getId().getSessionId(); const auto txnNumber = *doc.getId().getTxnNumber(); - auto coordinator = - std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), - lsid, - txnNumber, - scheduler.makeChildScheduler(), - boost::none /* No deadline */); + auto coordinator = std::make_shared<TransactionCoordinator>( + opCtx->getServiceContext(), + lsid, + txnNumber, + scheduler.makeChildScheduler(), + clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load())); catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); coordinator->continueCommit(doc); diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp index 753a7d7b7c6..7206cfb6be9 100644 --- a/src/mongo/db/s/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp @@ -471,8 +471,8 @@ TEST_F(TransactionCoordinatorServiceTest, coordinatorService->createCoordinator( operationContext(), _lsid, _txnNumber + 1, kCommitDeadline); - ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()), - static_cast<int>(txn::CommitDecision::kCanceled)); + ASSERT_THROWS_CODE( + commitDecisionFuture.get(), AssertionException, ErrorCodes::NoSuchTransaction); } TEST_F( @@ -493,7 +493,10 @@ TEST_F( coordinatorService->coordinateCommit(operationContext(), _lsid, _txnNumber, kTwoShardIdSet); // The old transaction should now be committed. - ASSERT(oldTxnCommitDecisionFuture == boost::none); + if (oldTxnCommitDecisionFuture) { + ASSERT_THROWS_CODE( + oldTxnCommitDecisionFuture->get(), AssertionException, ErrorCodes::NoSuchTransaction); + } // Make sure the newly created one works fine too. commitTransaction(*coordinatorService, _lsid, _txnNumber + 1, kTwoShardIdSet); diff --git a/src/mongo/db/s/transaction_coordinator_structures.cpp b/src/mongo/db/s/transaction_coordinator_structures.cpp index 08be1a0b7aa..2088ff6d5c7 100644 --- a/src/mongo/db/s/transaction_coordinator_structures.cpp +++ b/src/mongo/db/s/transaction_coordinator_structures.cpp @@ -59,7 +59,6 @@ StringData writeCommitDecisionEnumProperty(CommitDecision decision) { switch (decision) { case CommitDecision::kCommit: return kCommitDecision; case CommitDecision::kAbort: return kAbortDecision; - case CommitDecision::kCanceled: break; }; // clang-format on MONGO_UNREACHABLE; @@ -71,7 +70,6 @@ logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, switch (decision) { case txn::CommitDecision::kCommit: return stream << "kCommit"; case txn::CommitDecision::kAbort: return stream << "kAbort"; - case txn::CommitDecision::kCanceled: return stream << "kCanceled"; }; // clang-format on MONGO_UNREACHABLE; diff --git a/src/mongo/db/s/transaction_coordinator_structures.h b/src/mongo/db/s/transaction_coordinator_structures.h index bf2c52e2985..76f7890990d 100644 --- a/src/mongo/db/s/transaction_coordinator_structures.h +++ b/src/mongo/db/s/transaction_coordinator_structures.h @@ -44,7 +44,6 @@ using ParticipantsList = std::vector<ShardId>; enum class PrepareVote { kCommit, kAbort, - kCanceled, }; using CommitDecision = PrepareVote; diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 9cdd0869045..e1d8c9367c3 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -665,7 +665,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -687,7 +687,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -709,7 +709,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbor _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -731,7 +731,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -754,7 +754,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -782,7 +782,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -807,7 +807,7 @@ TEST_F(TransactionCoordinatorTest, _lsid, _txnNumber, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - boost::none); + Date_t::max()); coordinator.runCommit(kTwoShardIdList); auto commitDecisionFuture = coordinator.getDecision(); @@ -832,14 +832,5 @@ TEST_F(TransactionCoordinatorTest, coordinator.onCompletion().get(); } -TEST_F(TransactionCoordinatorTest, AbandonNewlyCreatedCoordinator) { - TransactionCoordinator coordinator( - getServiceContext(), - _lsid, - _txnNumber, - std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), - network()->now() + Seconds{30}); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h index 583350a6d0e..3eb4f6a3ed1 100644 --- a/src/mongo/db/s/transaction_coordinator_util.h +++ b/src/mongo/db/s/transaction_coordinator_util.h @@ -57,11 +57,6 @@ Future<void> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, TxnNumber txnNumber, const txn::ParticipantsList& participants); -/** - * Sends prepare to all participants and returns a future that will be resolved when either: - * a) All participants have responded with a vote to commit, or - * b) At least one participant votes to abort. - */ struct PrepareResponse; class PrepareVoteConsensus { public: @@ -85,6 +80,12 @@ private: Timestamp _maxPrepareTimestamp; }; + +/** + * Sends prepare to all participants and returns a future that will be resolved when either: + * a) All participants have responded with a vote to commit, or + * b) At least one participant votes to abort. + */ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 764704b0744..5408fc76516 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -171,6 +171,30 @@ public: } prepareTransactionCmd; +std::set<ShardId> validateParticipants(OperationContext* opCtx, + const std::vector<mongo::CommitParticipant>& participants) { + StringBuilder ss; + std::set<ShardId> participantsSet; + + ss << '['; + for (const auto& participant : participants) { + const auto& shardId = participant.getShardId(); + const bool inserted = participantsSet.emplace(shardId).second; + uassert(51162, + str::stream() << "Participant list contains duplicate shard " << shardId, + inserted); + ss << shardId << ", "; + } + ss << ']'; + + LOG(3) << "Coordinator shard received request to coordinate commit with " + "participant list " + << ss.str() << " for " << opCtx->getLogicalSessionId()->getId() << ':' + << opCtx->getTxnNumber(); + + return participantsSet; +} + class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> { public: using Request = CoordinateCommitTransaction; @@ -197,31 +221,11 @@ public: boost::optional<Future<txn::CommitDecision>> commitDecisionFuture; if (!cmd.getParticipants().empty()) { - // Convert the participant list array into a set, and assert that all participants - // in the list are unique. - // TODO (PM-564): Propagate the 'readOnly' flag down into the - // TransactionCoordinator. - std::set<ShardId> participantList; - StringBuilder ss; - ss << "["; - for (const auto& participant : cmd.getParticipants()) { - const auto& shardId = participant.getShardId(); - uassert(ErrorCodes::InvalidOptions, - str::stream() << "Participant list contained duplicate shardId " - << shardId, - std::find(participantList.begin(), participantList.end(), shardId) == - participantList.end()); - participantList.insert(shardId); - ss << shardId << " "; - } - ss << "]"; - LOG(3) << "Coordinator shard received request to coordinate commit with " - "participant list " - << ss.str() << " for " << opCtx->getLogicalSessionId()->getId() << ':' - << opCtx->getTxnNumber(); - - commitDecisionFuture = tcs->coordinateCommit( - opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber(), participantList); + commitDecisionFuture = + tcs->coordinateCommit(opCtx, + *opCtx->getLogicalSessionId(), + *opCtx->getTxnNumber(), + validateParticipants(opCtx, cmd.getParticipants())); } else { LOG(3) << "Coordinator shard received request to recover commit decision for " << opCtx->getLogicalSessionId()->getId() << ':' << opCtx->getTxnNumber(); @@ -239,16 +243,26 @@ public: }); if (commitDecisionFuture) { - // The commit coordination is still ongoing. Block waiting for the decision. - auto commitDecision = commitDecisionFuture->get(opCtx); - switch (commitDecision) { - case txn::CommitDecision::kCanceled: - // Continue on to recover the commit decision from disk. - break; - case txn::CommitDecision::kAbort: - uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted"); - case txn::CommitDecision::kCommit: - return; + auto swCommitDecision = commitDecisionFuture->getNoThrow(opCtx); + // The coordinator can only return NoSuchTransaction if cancelIfCommitNotYetStarted + // was called, which can happen in one of 3 cases: + // 1) The deadline to receive coordinateCommit passed + // 2) Transaction with a newer txnNumber started on the session before + // coordinateCommit was received + // 3) This is a sharded transaction, which used the optimized commit path and + // didn't require 2PC + // + // Even though only (3) requires recovering the commit decision from the local + // participant, since these cases cannot be differentiated currently, we always + // recover from the local participant. + if (swCommitDecision != ErrorCodes::NoSuchTransaction) { + auto commitDecision = uassertStatusOK(std::move(swCommitDecision)); + switch (commitDecision) { + case txn::CommitDecision::kCommit: + return; + case txn::CommitDecision::kAbort: + uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted"); + } } } @@ -267,22 +281,24 @@ public: // NoSuchTransaction and the client sent a non-default writeConcern, the // coordinateCommitTransaction command's post-amble will do a no-op write and wait for // the client's writeConcern. - BSONObj abortRequestObj = - BSON("abortTransaction" << 1 << "lsid" << opCtx->getLogicalSessionId()->toBSON() - << "txnNumber" - << *opCtx->getTxnNumber() - << "autocommit" - << false); + AbortTransaction abortTransaction; + abortTransaction.setDbName(NamespaceString::kAdminDb); + auto abortObj = abortTransaction.toBSON( + BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" + << *opCtx->getTxnNumber() + << "autocommit" + << false)); const auto abortStatus = [&] { txn::AsyncWorkScheduler aws(opCtx->getServiceContext()); - auto awsShutdownGuard = makeGuard([&aws] { - aws.shutdown({ErrorCodes::Interrupted, "Request interrupted due to timeout"}); - }); auto future = aws.scheduleRemoteCommand(txn::getLocalShardId(opCtx->getServiceContext()), ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - abortRequestObj); + abortObj); + ON_BLOCK_EXIT([&] { + aws.shutdown({ErrorCodes::Interrupted, "Request interrupted due to timeout"}); + future.wait(); + }); const auto& responseStatus = future.get(opCtx); uassertStatusOK(responseStatus.status); @@ -290,7 +306,7 @@ public: }(); LOG(3) << "coordinateCommitTransaction got response " << abortStatus << " for " - << abortRequestObj << " used to recover decision from local participant"; + << abortObj << " used to recover decision from local participant"; // If the abortTransaction succeeded, return that the transaction aborted. if (abortStatus.isOK()) |