From 63220b7df242b26f83c7083394b463c4e4b3a5d8 Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Tue, 12 Feb 2019 16:56:00 -0500 Subject: SERVER-38522/SERVER-38715 Make all TransactionCoordinator tasks interruptible --- src/mongo/db/s/transaction_coordinator.cpp | 166 ++++++--- src/mongo/db/s/transaction_coordinator.h | 98 +++--- src/mongo/db/s/transaction_coordinator_catalog.cpp | 113 +++--- src/mongo/db/s/transaction_coordinator_catalog.h | 63 ++-- .../db/s/transaction_coordinator_catalog_test.cpp | 8 +- src/mongo/db/s/transaction_coordinator_driver.cpp | 68 ++-- src/mongo/db/s/transaction_coordinator_driver.h | 40 +-- .../db/s/transaction_coordinator_futures_util.cpp | 20 +- .../db/s/transaction_coordinator_futures_util.h | 15 + .../transaction_coordinator_futures_util_test.cpp | 21 ++ src/mongo/db/s/transaction_coordinator_service.cpp | 64 ++-- src/mongo/db/s/transaction_coordinator_service.h | 3 + .../db/s/transaction_coordinator_service_test.cpp | 23 +- src/mongo/db/s/transaction_coordinator_test.cpp | 382 ++++++++++++--------- .../db/s/transaction_coordinator_test_fixture.cpp | 2 +- 15 files changed, 650 insertions(+), 436 deletions(-) (limited to 'src/mongo/db') diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index e8a0b4432c4..5a1d0dfd41c 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -71,13 +71,29 @@ CoordinatorCommitDecision makeDecisionFromPrepareVoteConsensus( TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, - TxnNumber txnNumber) + TxnNumber txnNumber, + std::unique_ptr scheduler, + boost::optional coordinateCommitDeadline) : _serviceContext(serviceContext), - _driver(serviceContext), _lsid(lsid), - _txnNumber(txnNumber) {} + _txnNumber(txnNumber), + _scheduler(std::move(scheduler)), + _driver(serviceContext) { + if (coordinateCommitDeadline) { + _deadlineScheduler = _scheduler->makeChildScheduler(); + _deadlineScheduler + ->scheduleWorkAt(*coordinateCommitDeadline, + [this](OperationContext* opCtx) { cancelIfCommitNotYetStarted(); }) + .getAsync([](const Status&) {}); + } +} TransactionCoordinator::~TransactionCoordinator() { + _cancelTimeoutWaitForCommitTask(); + + if (_deadlineScheduler) + _deadlineScheduler.reset(); + stdx::lock_guard lk(_mutex); invariant(_state == TransactionCoordinator::CoordinatorState::kDone); @@ -86,61 +102,33 @@ TransactionCoordinator::~TransactionCoordinator() { invariant(_completionPromises.empty()); } -/** - * Implements the high-level logic for two-phase commit. - */ -SharedSemiFuture TransactionCoordinator::runCommit( - const std::vector& participantShards) { +void TransactionCoordinator::runCommit(std::vector participantShards) { { - // If another thread has already begun the commit process, return early. - stdx::lock_guard lk(_mutex); - if (_state != CoordinatorState::kInit) { - return _finalDecisionPromise.getFuture(); - } + stdx::lock_guard lg(_mutex); + if (_state != CoordinatorState::kInit) + return; _state = CoordinatorState::kPreparing; } + _cancelTimeoutWaitForCommitTask(); + _driver.persistParticipantList(_lsid, _txnNumber, participantShards) .then([this, participantShards]() { return _runPhaseOne(participantShards); }) .then([this, participantShards](CoordinatorCommitDecision decision) { return _runPhaseTwo(participantShards, decision); }) .getAsync([this](Status s) { _handleCompletionStatus(s); }); - - return _finalDecisionPromise.getFuture(); -} - -Future TransactionCoordinator::_runPhaseOne( - const std::vector& participantShards) { - return _driver.sendPrepare(participantShards, _lsid, _txnNumber) - .then([this, participantShards](txn::PrepareVoteConsensus result) { - invariant(_state == CoordinatorState::kPreparing); - - auto decision = - makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber); - - return _driver - .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp) - .then([decision] { return decision; }); - }); -} - -Future TransactionCoordinator::_runPhaseTwo(const std::vector& participantShards, - const CoordinatorCommitDecision& decision) { - return _sendDecisionToParticipants(participantShards, decision) - .then([this] { return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); }) - .then([this] { - LOG(3) << "Two-phase commit completed for session " << _lsid.toBSON() - << ", transaction number " << _txnNumber; - - stdx::unique_lock ul(_mutex); - _transitionToDone(std::move(ul)); - }); } void TransactionCoordinator::continueCommit(const TransactionCoordinatorDocument& doc) { - _state = CoordinatorState::kPreparing; - const auto participantShards = doc.getParticipants(); + { + stdx::lock_guard lg(_mutex); + invariant(_state == CoordinatorState::kInit); + invariant(!_deadlineScheduler); + _state = CoordinatorState::kPreparing; + } + + const auto& participantShards = doc.getParticipants(); // Helper lambda to get the decision either from the document passed in or from the participants // (by performing 'phase one' of two-phase commit). @@ -170,27 +158,67 @@ Future TransactionCoordinator::onCompletion() { auto completionPromiseFuture = makePromiseFuture(); _completionPromises.push_back(std::move(completionPromiseFuture.promise)); - return std::move(completionPromiseFuture.future); + + return std::move(completionPromiseFuture.future) + .onError( + [](const Status& s) { uasserted(ErrorCodes::InterruptedDueToStepDown, s.reason()); }); } SharedSemiFuture TransactionCoordinator::getDecision() { stdx::lock_guard lg(_mutex); - return _finalDecisionPromise.getFuture(); + return _decisionPromise.getFuture(); } void TransactionCoordinator::cancelIfCommitNotYetStarted() { + _cancelTimeoutWaitForCommitTask(); + stdx::unique_lock lk(_mutex); if (_state == CoordinatorState::kInit) { - invariant(!_finalDecisionPromise.getFuture().isReady()); - _finalDecisionPromise.emplaceValue(txn::CommitDecision::kCanceled); + invariant(!_decisionPromise.getFuture().isReady()); + _decisionPromise.emplaceValue(txn::CommitDecision::kCanceled); _transitionToDone(std::move(lk)); } } +void TransactionCoordinator::_cancelTimeoutWaitForCommitTask() { + if (_deadlineScheduler) { + _deadlineScheduler->shutdown( + {ErrorCodes::CallbackCanceled, "Interrupting the commit received deadline task"}); + } +} + +Future TransactionCoordinator::_runPhaseOne( + const std::vector& participantShards) { + return _driver.sendPrepare(participantShards, _lsid, _txnNumber) + .then([this, participantShards](txn::PrepareVoteConsensus result) { + invariant(_state == CoordinatorState::kPreparing); + + auto decision = + makeDecisionFromPrepareVoteConsensus(_serviceContext, result, _lsid, _txnNumber); + + return _driver + .persistDecision(_lsid, _txnNumber, participantShards, decision.commitTimestamp) + .then([decision] { return decision; }); + }); +} + +Future TransactionCoordinator::_runPhaseTwo(const std::vector& participantShards, + const CoordinatorCommitDecision& decision) { + return _sendDecisionToParticipants(participantShards, decision) + .then([this] { return _driver.deleteCoordinatorDoc(_lsid, _txnNumber); }) + .then([this] { + LOG(3) << "Two-phase commit completed for session " << _lsid.toBSON() + << ", transaction number " << _txnNumber; + + stdx::unique_lock ul(_mutex); + _transitionToDone(std::move(ul)); + }); +} + Future TransactionCoordinator::_sendDecisionToParticipants( const std::vector& participantShards, CoordinatorCommitDecision decision) { invariant(_state == CoordinatorState::kPreparing); - _finalDecisionPromise.emplaceValue(decision.decision); + _decisionPromise.emplaceValue(decision.decision); // Send the decision to all participants. switch (decision.decision) { @@ -214,14 +242,17 @@ void TransactionCoordinator::_handleCompletionStatus(Status s) { } stdx::unique_lock lk(_mutex); + LOG(3) << "Two-phase commit failed with error in state " << _state << " for transaction " << _txnNumber << " on session " << _lsid.toBSON() << causedBy(s); - // If an error occurred prior to making a decision, set an error on the decision - // promise to propagate it to callers of runCommit. - if (!_finalDecisionPromise.getFuture().isReady()) { + // If an error occurred prior to making a decision, set an error on the decision promise to + // propagate it to callers of runCommit + if (!_decisionPromise.getFuture().isReady()) { invariant(_state == CoordinatorState::kPreparing); - _finalDecisionPromise.setError(s); + _decisionPromise.setError(s == ErrorCodes::TransactionCoordinatorReachedAbortDecision + ? Status{ErrorCodes::InterruptedDueToStepDown, s.reason()} + : s); } _transitionToDone(std::move(lk)); @@ -292,4 +323,31 @@ BSONObj CoordinatorCommitDecision::toBSON() const { return builder.obj(); } +logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, + const TransactionCoordinator::CoordinatorState& state) { + using State = TransactionCoordinator::CoordinatorState; + // clang-format off + switch (state) { + case State::kInit: stream.stream() << "kInit"; break; + case State::kPreparing: stream.stream() << "kPreparing"; break; + case State::kAborting: stream.stream() << "kAborting"; break; + case State::kCommitting: stream.stream() << "kCommitting"; break; + case State::kDone: stream.stream() << "kDone"; break; + }; + // clang-format on + return stream; +} + +logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, + const txn::CommitDecision& decision) { + // clang-format off + switch (decision) { + case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break; + case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break; + case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break; + }; + // clang-format on + return stream; +} + } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index a4981edffad..0921c9c4090 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -43,9 +43,20 @@ class TransactionCoordinator { MONGO_DISALLOW_COPYING(TransactionCoordinator); public: + /** + * Instantiates a new TransactioncCoordinator for the specified lsid + txnNumber pair and gives + * it a 'scheduler' to use for any asynchronous tasks it spawns. + * + * If the 'coordinateCommitDeadline' parameter is specified, a timed task will be scheduled to + * cause the coordinator to be put in a cancelled state, if runCommit is not eventually + * received. + */ TransactionCoordinator(ServiceContext* serviceContext, const LogicalSessionId& lsid, - TxnNumber txnNumber); + TxnNumber txnNumber, + std::unique_ptr scheduler, + boost::optional coordinateCommitDeadline); + ~TransactionCoordinator(); /** @@ -94,20 +105,25 @@ public: /** * The first time this is called, it asynchronously begins the two-phase commit process for the - * transaction that this coordinator is responsible for, and returns a future that will be - * resolved when a commit or abort decision has been made and persisted. + * transaction that this coordinator is responsible for. * - * Subsequent calls will not re-run the commit process, but instead return a future that will be - * resolved when the original commit process that was kicked off comes to a decision. If the - * original commit process has completed, returns a ready future containing the final decision. + * Subsequent calls will not re-run the commit process. */ - SharedSemiFuture runCommit(const std::vector& participantShards); + void runCommit(std::vector participantShards); /** * To be used to continue coordinating a transaction on step up. */ void continueCommit(const TransactionCoordinatorDocument& doc); + /** + * Gets a Future that will contain the decision that the coordinator reaches. Note that this + * will never be signaled unless runCommit has been called. + * + * TODO (SERVER-37364): Remove this when it is no longer needed by the coordinator service. + */ + SharedSemiFuture getDecision(); + /** * Returns a future that will be signaled when the transaction has completely finished * committing or aborting (i.e. when commit/abort acknowledgements have been received from all @@ -117,24 +133,23 @@ public: */ Future onCompletion(); - /** - * Gets a Future that will contain the decision that the coordinator reaches. Note that this - * will never be signaled unless runCommit has been called. - * - * TODO (SERVER-37364): Remove this when it is no longer needed by the coordinator service. - */ - SharedSemiFuture getDecision(); - /** * If runCommit has not yet been called, this will transition this coordinator object to * the 'done' state, effectively making it impossible for two-phase commit to be run for this * coordinator. * - * Called when a transaction with a higher transaction number is received on this session. + * Called when a transaction with a higher transaction number is received on this session or + * when the transaction coordinator service is shutting down. */ void cancelIfCommitNotYetStarted(); private: + /** + * Called when the timeout task scheduled by the constructor is no longer necessary (i.e. + * coordinateCommit was received or the coordinator was cancelled intentionally). + */ + void _cancelTimeoutWaitForCommitTask(); + /** * Expects the participant list to already be majority-committed. * @@ -181,14 +196,23 @@ private: // Shortcut to the service context under which this coordinator runs ServiceContext* const _serviceContext; - // Context object used to perform and track the state of asynchronous operations on behalf of - // this coordinator. - TransactionCoordinatorDriver _driver; - // The lsid + transaction number that this coordinator is coordinating const LogicalSessionId _lsid; const TxnNumber _txnNumber; + // Scheduler to use for all asynchronous activity which needs to be performed by this + // coordinator + std::unique_ptr _scheduler; + + // If not nullptr, references the scheduler containing a task which willl trigger + // 'cancelIfCommitNotYetStarted' if runCommit has not been invoked. If nullptr, then this + // coordinator was not created with an expiration task. + std::unique_ptr _deadlineScheduler; + + // Context object used to perform and track the state of asynchronous operations on behalf of + // this coordinator. + TransactionCoordinatorDriver _driver; + // Protects the state below mutable stdx::mutex _mutex; @@ -199,7 +223,7 @@ private: // abort). This is only known once all responses to prepare have been received from all // participants, and the collective decision has been majority persisted to // config.transactionCommitDecisions. - SharedPromise _finalDecisionPromise; + SharedPromise _decisionPromise; // A list of all promises corresponding to futures that were returned to callers of // onCompletion. @@ -208,30 +232,10 @@ private: std::vector> _completionPromises; }; -inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const TransactionCoordinator::CoordinatorState& state) { - using State = TransactionCoordinator::CoordinatorState; - // clang-format off - switch (state) { - case State::kInit: stream.stream() << "kInit"; break; - case State::kPreparing: stream.stream() << "kPreparing"; break; - case State::kAborting: stream.stream() << "kAborting"; break; - case State::kCommitting: stream.stream() << "kCommitting"; break; - case State::kDone: stream.stream() << "kDone"; break; - }; - // clang-format on - return stream; -} - -inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, - const txn::CommitDecision& decision) { - // clang-format off - switch (decision) { - case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break; - case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break; - case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break; - }; - // clang-format on - return stream; -} +logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, + const TransactionCoordinator::CoordinatorState& state); + +logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, + const txn::CommitDecision& decision); + } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index dc5dd0fb0e4..3fa862ddde3 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -48,15 +48,49 @@ TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() { join(); } +void TransactionCoordinatorCatalog::exitStepUp(Status status) { + if (status.isOK()) { + LOG(0) << "Incoming coordinateCommit requests are now enabled"; + } else { + warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed" + << causedBy(status); + } + + stdx::lock_guard lk(_mutex); + invariant(!_stepUpCompletionStatus); + _stepUpCompletionStatus = std::move(status); + _stepUpCompleteCV.notify_all(); +} + +void TransactionCoordinatorCatalog::onStepDown() { + stdx::unique_lock ul(_mutex); + + std::vector> coordinatorsToCancel; + for (auto && [ sessionId, coordinatorsForSession ] : _coordinatorsBySession) { + for (auto && [ txnNumber, coordinator ] : coordinatorsForSession) { + coordinatorsToCancel.emplace_back(coordinator); + } + } + + ul.unlock(); + + for (auto&& coordinator : coordinatorsToCancel) { + coordinator->cancelIfCommitNotYetStarted(); + } + + ul.lock(); + _cleanupCompletedCoordinators(ul); +} + void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, - LogicalSessionId lsid, + const LogicalSessionId& lsid, TxnNumber txnNumber, std::shared_ptr coordinator, bool forStepUp) { - stdx::unique_lock lk(_mutex); - + stdx::unique_lock ul(_mutex); + auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); }); if (!forStepUp) { - _waitForStepUpToComplete(lk, opCtx); + _waitForStepUpToComplete(ul, opCtx); } auto& coordinatorsBySession = _coordinatorsBySession[lsid]; @@ -70,18 +104,19 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, // Schedule callback to remove coordinator from catalog when it either commits or aborts. coordinator->onCompletion().getAsync( - [this, lsid, txnNumber](Status) { remove(lsid, txnNumber); }); + [this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); }); LOG(3) << "Inserting coordinator for transaction " << txnNumber << " on session " << lsid.toBSON() << " into in-memory catalog"; + coordinatorsBySession[txnNumber] = std::move(coordinator); } -std::shared_ptr TransactionCoordinatorCatalog::get(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber) { - stdx::unique_lock lk(_mutex); - _waitForStepUpToComplete(lk, opCtx); +std::shared_ptr TransactionCoordinatorCatalog::get( + OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) { + stdx::unique_lock ul(_mutex); + auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); }); + _waitForStepUpToComplete(ul, opCtx); std::shared_ptr coordinatorToReturn; @@ -112,9 +147,11 @@ std::shared_ptr TransactionCoordinatorCatalog::get(Opera } boost::optional>> -TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, LogicalSessionId lsid) { - stdx::unique_lock lk(_mutex); - _waitForStepUpToComplete(lk, opCtx); +TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, + const LogicalSessionId& lsid) { + stdx::unique_lock ul(_mutex); + auto cleanupCoordinatorsGuard = makeGuard([&] { _cleanupCompletedCoordinators(ul); }); + _waitForStepUpToComplete(ul, opCtx); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -132,7 +169,7 @@ TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, Logic return std::make_pair(lastCoordinatorOnSession->first, lastCoordinatorOnSession->second); } -void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnNumber) { +void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNumber txnNumber) { stdx::lock_guard lk(_mutex); LOG(3) << "Removing coordinator for transaction " << txnNumber << " on session " @@ -163,6 +200,11 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN } } + // Since the '_remove' method executes on the AWS of the coordinator which is being + // removed, we cannot destroy it inline. Because of this, put it on a cleanup list so + // that subsequent catalog operations will perform the cleanup. + _coordinatorsToCleanup.emplace_back(coordinatorForTxnIter->second); + coordinatorsForSession.erase(coordinatorForTxnIter); if (coordinatorsForSession.empty()) { _coordinatorsBySession.erase(coordinatorsForSessionIter); @@ -172,28 +214,14 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN if (_coordinatorsBySession.empty()) { LOG(3) << "Signaling last active coordinator removed"; - _noActiveCoordinatorsCv.notify_all(); - } -} - -void TransactionCoordinatorCatalog::exitStepUp(Status status) { - if (status.isOK()) { - LOG(0) << "Incoming coordinateCommit requests are now enabled"; - } else { - warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed" - << causedBy(status); + _noActiveCoordinatorsCV.notify_all(); } - - stdx::lock_guard lk(_mutex); - invariant(!_stepUpCompletionStatus); - _stepUpCompletionStatus = std::move(status); - _stepUpCompleteCv.notify_all(); } void TransactionCoordinatorCatalog::join() { stdx::unique_lock ul(_mutex); - while (!_noActiveCoordinatorsCv.wait_for( + while (!_noActiveCoordinatorsCV.wait_for( ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) { LOG(0) << "After 5 seconds of wait there are still " << _coordinatorsBySession.size() << " sessions left with active coordinators which have not yet completed"; @@ -210,23 +238,28 @@ void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lockwaitForConditionOrInterrupt( - _stepUpCompleteCv, lk, [this]() { return bool(_stepUpCompletionStatus); }); + _stepUpCompleteCV, lk, [this]() { return bool(_stepUpCompletionStatus); }); uassertStatusOK(*_stepUpCompletionStatus); } +void TransactionCoordinatorCatalog::_cleanupCompletedCoordinators( + stdx::unique_lock& ul) { + invariant(ul.owns_lock()); + auto coordinatorsToCleanup = std::move(_coordinatorsToCleanup); + + // Ensure the destructors run outside of the lock in order to minimize the time this methods + // spends in a critical section + ul.unlock(); +} + std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { StringBuilder ss; ss << "["; - for (auto coordinatorsForSession = _coordinatorsBySession.begin(); - coordinatorsForSession != _coordinatorsBySession.end(); - ++coordinatorsForSession) { - ss << "\n"; - ss << coordinatorsForSession->first.toBSON() << ": "; - for (auto coordinatorForTxnNumber = coordinatorsForSession->second.begin(); - coordinatorForTxnNumber != coordinatorsForSession->second.end(); - ++coordinatorForTxnNumber) { - ss << coordinatorForTxnNumber->first << " "; + for (const auto& coordinatorsForSession : _coordinatorsBySession) { + ss << "\n" << coordinatorsForSession.first.toBSON() << ": "; + for (const auto& coordinator : coordinatorsForSession.second) { + ss << coordinator.first << ","; } } ss << "]"; diff --git a/src/mongo/db/s/transaction_coordinator_catalog.h b/src/mongo/db/s/transaction_coordinator_catalog.h index 99ca47b246a..67f6daf77db 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.h +++ b/src/mongo/db/s/transaction_coordinator_catalog.h @@ -41,9 +41,7 @@ namespace mongo { /** * A container for TransactionCoordinator objects, indexed by logical session id and transaction - * number. It allows holding several coordinator objects per session. It also knows how to recreate - * itself from the config.txnCommitDecisions collection, which will be done on transition to - * primary (whether from startup or ordinary step up). + * number. It allows holding several coordinator objects per session. */ class TransactionCoordinatorCatalog { MONGO_DISALLOW_COPYING(TransactionCoordinatorCatalog); @@ -52,6 +50,16 @@ public: TransactionCoordinatorCatalog(); ~TransactionCoordinatorCatalog(); + /** + * Marks that recovery of the catalog has completed and that operations can be run on it. + */ + void exitStepUp(Status status); + + /** + * Cancels any outstanding idle transaction coordinators so that they will get unregistered. + */ + void onStepDown(); + /** * Inserts a coordinator into the catalog. * @@ -60,7 +68,7 @@ public: * does not take place. */ void insert(OperationContext* opCtx, - LogicalSessionId lsid, + const LogicalSessionId& lsid, TxnNumber txnNumber, std::shared_ptr coordinator, bool forStepUp = false); @@ -70,7 +78,7 @@ public: * does not exist, return nullptr. */ std::shared_ptr get(OperationContext* opCtx, - LogicalSessionId lsid, + const LogicalSessionId& lsid, TxnNumber txnNumber); /** @@ -78,21 +86,7 @@ public: * exists. If it does not exist, return boost::none. */ boost::optional>> - getLatestOnSession(OperationContext* opCtx, LogicalSessionId lsid); - - /** - * Removes the coordinator with the given session id and transaction number from the catalog, if - * one exists, and if this the last coordinator in the catalog, signals that there are no active - * coordinators. - * - * Note: The coordinator must be in a state suitable for removal (i.e. committed or aborted). - */ - void remove(LogicalSessionId lsid, TxnNumber txnNumber); - - /** - * Marks no stepup in progress and signals that no stepup is in progress. - */ - void exitStepUp(Status status); + getLatestOnSession(OperationContext* opCtx, const LogicalSessionId& lsid); /** * Blocking method, which waits for all coordinators registered on the catalog to complete @@ -118,6 +112,22 @@ private: */ void _waitForStepUpToComplete(stdx::unique_lock& lk, OperationContext* opCtx); + /** + * Removes the coordinator with the given session id and transaction number from the catalog, if + * one exists, and if this the last coordinator in the catalog, signals that there are no active + * coordinators. + * + * Note: The coordinator must be in a state suitable for removal (i.e. committed or aborted). + */ + void _remove(const LogicalSessionId& lsid, TxnNumber txnNumber); + + /** + * Goes through the '_coordinatorsToCleanup' list and deletes entries from it. As a side-effect + * also unlocks the passed-in lock, so no other synchronized members of the class should be + * accessed after it is called. + */ + void _cleanupCompletedCoordinators(stdx::unique_lock& ul); + /** * Constructs a string representation of all the coordinators registered on the catalog. */ @@ -135,23 +145,26 @@ private: // commit coordination and would normally be expunged from memory. LogicalSessionIdMap _coordinatorsBySessionDefunct; + // Set of coordinators which have completed, but have not yet been destroyed. + std::list> _coordinatorsToCleanup; + // Stores the result of the coordinator catalog's recovery attempt (the status passed to // exitStepUp). This is what the values mean: // // stepUpCompletionStatus = none - brand new created object (exitStepUp has not been called - // yet). All calls will block. + // yet). All calls will block. // stepUpCompletionStatus = OK - recovery completed successfully, transactions can be - // coordinated + // coordinated // stepUpCompletionStatus = error - recovery completed with an error, transactions cannot be - // coordinated (all methods will fail with this error) + // coordinated (all methods will fail with this error) boost::optional _stepUpCompletionStatus; // Signaled when recovery of the catalog completes (when _stepUpCompletionStatus transitions // from none to either OK or error) - stdx::condition_variable _stepUpCompleteCv; + stdx::condition_variable _stepUpCompleteCV; // Notified when the last coordinator is removed from the catalog. - stdx::condition_variable _noActiveCoordinatorsCv; + stdx::condition_variable _noActiveCoordinatorsCV; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp index 9a114eb203a..b73955b5c78 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp @@ -63,8 +63,12 @@ protected: void createCoordinatorInCatalog(OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) { - auto newCoordinator = - std::make_shared(getServiceContext(), lsid, txnNumber); + auto newCoordinator = std::make_shared( + getServiceContext(), + lsid, + txnNumber, + std::make_unique(getServiceContext()), + boost::none); _coordinatorCatalog->insert(opCtx, lsid, txnNumber, newCoordinator); _coordinatorsForTest.push_back(newCoordinator); diff --git a/src/mongo/db/s/transaction_coordinator_driver.cpp b/src/mongo/db/s/transaction_coordinator_driver.cpp index f44d5e7ebd7..56f74078594 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.cpp +++ b/src/mongo/db/s/transaction_coordinator_driver.cpp @@ -140,6 +140,11 @@ bool shouldRetryCommandAgainstShard(const ShardId& shardId, shouldRetry = false; } + if (responseStatus == ErrorCodes::TransactionCoordinatorSteppingDown || + responseStatus == ErrorCodes::TransactionCoordinatorReachedAbortDecision) { + shouldRetry = false; + } + LOG(3) << "Coordinator " << (shouldRetry ? "retrying " : "not retrying ") << commandObj << " against " << (isLocalShard ? "local " : "") << "shard " << shardId << " because got response status " << responseStatus; @@ -279,8 +284,10 @@ Future TransactionCoordinatorDriver::sendPrepare( // Send prepare to all participants asynchronously and collect their future responses in a // vector of responses. + auto prepareScheduler = _scheduler->makeChildScheduler(); + for (const auto& participant : participantShards) { - responses.push_back(sendPrepareToShard(participant, prepareObj)); + responses.push_back(sendPrepareToShard(*prepareScheduler, participant, prepareObj)); } // Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp @@ -291,7 +298,8 @@ Future TransactionCoordinatorDriver::sendPrepare( // Initial value txn::PrepareVoteConsensus{boost::none, boost::none}, // Aggregates an incoming response (next) with the existing aggregate value (result) - [this](txn::PrepareVoteConsensus& result, const PrepareResponse& next) { + [prepareScheduler = std::move(prepareScheduler)](txn::PrepareVoteConsensus & result, + const PrepareResponse& next) { if (!next.vote) { LOG(3) << "Transaction coordinator did not receive a response from shard " << next.participantShardId; @@ -302,7 +310,9 @@ Future TransactionCoordinatorDriver::sendPrepare( case PrepareVote::kAbort: result.decision = CommitDecision::kAbort; result.maxPrepareTimestamp = boost::none; - cancel(); + prepareScheduler->shutdown( + {ErrorCodes::TransactionCoordinatorReachedAbortDecision, + "Received at least one vote abort"}); return txn::ShouldStopIteration::kYes; case PrepareVote::kCommit: result.decision = CommitDecision::kCommit; @@ -469,7 +479,7 @@ Future TransactionCoordinatorDriver::sendCommit(const std::vector std::vector> responses; for (const auto& participant : participantShards) { - responses.push_back(sendDecisionToParticipantShard(participant, commitObj)); + responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, commitObj)); } return txn::whenAll(responses); } @@ -486,7 +496,7 @@ Future TransactionCoordinatorDriver::sendAbort(const std::vector& std::vector> responses; for (const auto& participant : participantShards) { - responses.push_back(sendDecisionToParticipantShard(participant, abortObj)); + responses.push_back(sendDecisionToParticipantShard(*_scheduler, participant, abortObj)); } return txn::whenAll(responses); } @@ -595,20 +605,22 @@ std::vector TransactionCoordinatorDriver::readAl } Future TransactionCoordinatorDriver::sendPrepareToShard( - const ShardId& shardId, const BSONObj& commandObj) { + txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) { const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId); - return txn::doWhile( - *_scheduler, + + auto f = txn::doWhile( + scheduler, kExponentialBackoff, - [ isLocalShard, shardId, commandObj = commandObj.getOwned() ]( + [ shardId, isLocalShard, commandObj = commandObj.getOwned() ]( StatusWith swPrepareResponse) { return shouldRetryCommandAgainstShard( shardId, isLocalShard, commandObj, swPrepareResponse.getStatus()); }, - [ this, shardId, isLocalShard, commandObj = commandObj.getOwned() ] { + [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] { LOG(3) << "Coordinator going to send command " << commandObj << " to " << (isLocalShard ? " local " : "") << " shard " << shardId; - return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + + return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -641,6 +653,7 @@ Future TransactionCoordinatorDriver::sendPrepareToShard( LOG(3) << "Coordinator shard received " << status << " from shard " << shardId << " for " << commandObj; + if (ErrorCodes::isVoteAbortError(status.code())) { return PrepareResponse{shardId, PrepareVote::kAbort, boost::none}; } @@ -648,7 +661,7 @@ Future TransactionCoordinatorDriver::sendPrepareToShard( uassertStatusOK(status); MONGO_UNREACHABLE; }) - .onError([shardId, isLocalShard](const Status& s) { + .onError([shardId, isLocalShard](const Status&) { invariant(!isLocalShard); // ShardNotFound may indicate that the participant shard has been removed (it // could also mean the participant shard was recently added and this node @@ -659,32 +672,31 @@ Future TransactionCoordinatorDriver::sendPrepareToShard( // must then send abort. return Future::makeReady( {shardId, CommitDecision::kAbort, boost::none}); - }) - .onError([this, shardId](const Status& status) { - if (_cancelled.loadRelaxed()) { - LOG(3) << "Prepare stopped retrying due to retrying being cancelled"; - return PrepareResponse{shardId, boost::none, boost::none}; - } - - uassertStatusOK(status); - MONGO_UNREACHABLE; }); }); + + return std::move(f).onError( + [shardId](const Status&) { + LOG(3) << "Prepare stopped retrying due to retrying being cancelled"; + return PrepareResponse{shardId, boost::none, boost::none}; + }); } Future TransactionCoordinatorDriver::sendDecisionToParticipantShard( - const ShardId& shardId, const BSONObj& commandObj) { + txn::AsyncWorkScheduler& scheduler, const ShardId& shardId, const BSONObj& commandObj) { const bool isLocalShard = checkIsLocalShard(_serviceContext, shardId); + return txn::doWhile( - *_scheduler, + scheduler, kExponentialBackoff, - [ isLocalShard, shardId, commandObj = commandObj.getOwned() ](const Status& s) { + [ shardId, isLocalShard, commandObj = commandObj.getOwned() ](const Status& s) { return shouldRetryCommandAgainstShard(shardId, isLocalShard, commandObj, s); }, - [ this, isLocalShard, shardId, commandObj = commandObj.getOwned() ] { + [&scheduler, shardId, isLocalShard, commandObj = commandObj.getOwned() ] { LOG(3) << "Coordinator going to send command " << commandObj << " to " << (isLocalShard ? " local " : "") << " shard " << shardId; - return _scheduler->scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) + + return scheduler.scheduleRemoteCommand(shardId, kPrimaryReadPreference, commandObj) .then([ shardId, commandObj = commandObj.getOwned() ](ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -715,8 +727,4 @@ Future TransactionCoordinatorDriver::sendDecisionToParticipantShard( }); } -void TransactionCoordinatorDriver::cancel() { - _cancelled.store(true); -} - } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_driver.h b/src/mongo/db/s/transaction_coordinator_driver.h index 5e935753eb6..99dce0b67e1 100644 --- a/src/mongo/db/s/transaction_coordinator_driver.h +++ b/src/mongo/db/s/transaction_coordinator_driver.h @@ -190,15 +190,6 @@ public: */ Future deleteCoordinatorDoc(const LogicalSessionId& lsid, TxnNumber txnNumber); - /** - * Non-blocking method, which interrupts any executing asynchronous tasks and prevents new ones - * from getting scheduled. Any futures returned by the driver must still be waited on before the - * object is disposed of. - * - * TODO (SERVER-38522): Implement using real cancellation through the AsyncWorkScheduler - */ - void cancel(); - /** * Reads and returns all documents in config.transaction_coordinators. */ @@ -210,32 +201,37 @@ public: // /** - * Sends prepare to a given shard, retrying until a response is received from the participant or - * until the coordinator is no longer in a preparing state due to having already received a vote - * to abort from another participant. + * Sends prepare to the given shard and returns a future, which will be set with the vote. * - * Returns a future containing the participant's response. + * This method will retry until it receives a non-retryable response from the remote node or + * until the scheduler under which it is running is shut down. Because of this it can return + * only the following error code(s): + * - TransactionCoordinatorSteppingDown + * - ShardNotFound */ - Future sendPrepareToShard(const ShardId& shardId, + Future sendPrepareToShard(txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, const BSONObj& prepareCommandObj); /** - * Sends a command corresponding to a commit decision (i.e. commitTransaction or - * abortTransaction) to the given shard ID and retries on any retryable error until the command - * succeeds or receives a response that may be interpreted as a vote to abort (e.g. - * NoSuchTransaction). + * Sends a command corresponding to a commit decision (i.e. commitTransaction or* + * abortTransaction) to the given shard and returns a future, which will be set with the result. * * Used for sendCommit and sendAbort. + * + * This method will retry until it receives a response from the remote node which can be + * interpreted as vote abort (e.g. NoSuchTransaction), or until the scheduler under which it is + * running is shut down. Because of this it can return only the following error code(s): + * - TransactionCoordinatorSteppingDown */ - Future sendDecisionToParticipantShard(const ShardId& shardId, const BSONObj& commandObj); + Future sendDecisionToParticipantShard(txn::AsyncWorkScheduler& scheduler, + const ShardId& shardId, + const BSONObj& commandObj); private: ServiceContext* _serviceContext; std::unique_ptr _scheduler; - - // TODO (SERVER-38522): Remove once AsyncWorkScheduler is used for cancellation - AtomicWord _cancelled{false}; }; } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.cpp b/src/mongo/db/s/transaction_coordinator_futures_util.cpp index 55627f400ed..81d101c91e8 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util.cpp @@ -59,10 +59,10 @@ AsyncWorkScheduler::AsyncWorkScheduler(ServiceContext* serviceContext) AsyncWorkScheduler::~AsyncWorkScheduler() { { - stdx::lock_guard lg(_mutex); - invariant(_activeOpContexts.empty()); - invariant(_activeHandles.empty()); - invariant(_childSchedulers.empty()); + stdx::unique_lock ul(_mutex); + _allListsEmptyCV.wait(ul, [&] { + return _activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty(); + }); } if (!_parent) @@ -70,6 +70,7 @@ AsyncWorkScheduler::~AsyncWorkScheduler() { stdx::lock_guard lg(_parent->_mutex); _parent->_childSchedulers.erase(_itToRemove); + _parent->_notifyAllTasksComplete(lg); _parent = nullptr; } @@ -165,6 +166,7 @@ Future AsyncWorkScheduler::scheduleRemot [ this, it = std::move(it) ](StatusWith s) { stdx::lock_guard lg(_mutex); _activeHandles.erase(it); + _notifyAllTasksComplete(lg); }); }); } @@ -212,7 +214,7 @@ Future AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, const auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); if (MONGO_FAIL_POINT(hangWhileTargetingRemoteHost)) { - LOG(0) << "Hit hangWhileTargetingRemoteHost failpoint"; + LOG(0) << "Hit hangWhileTargetingRemoteHost failpoint for shard " << shardId; MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, hangWhileTargetingRemoteHost); } @@ -221,6 +223,14 @@ Future AsyncWorkScheduler::_targetHostAsync(const ShardId& shardId, }); } +void AsyncWorkScheduler::_notifyAllTasksComplete(WithLock) { + if (_shutdownStatus.isOK()) + return; + + if (_activeOpContexts.empty() && _activeHandles.empty() && _childSchedulers.empty()) + _allListsEmptyCV.notify_all(); +} + Future whenAll(std::vector>& futures) { std::vector> dummyFutures; for (auto&& f : futures) { diff --git a/src/mongo/db/s/transaction_coordinator_futures_util.h b/src/mongo/db/s/transaction_coordinator_futures_util.h index d8ca57a4063..942ee7e2fbf 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util.h +++ b/src/mongo/db/s/transaction_coordinator_futures_util.h @@ -93,6 +93,9 @@ public: auto scopedGuard = makeGuard([&] { ul.lock(); _activeOpContexts.erase(uniqueOpCtxIter); + // There is no need to call _notifyAllTasksComplete here, because we + // will still have an outstanding _activeHandles entry, so the scheduler + // is never completed at this point }); return task(uniqueOpCtxIter->get()); @@ -108,6 +111,7 @@ public: [ this, it = std::move(it) ](StatusOrStatusWith s) { stdx::lock_guard lg(_mutex); _activeHandles.erase(it); + _notifyAllTasksComplete(lg); }); } catch (const DBException& ex) { taskCompletionPromise->setError(ex.toStatus()); @@ -151,6 +155,13 @@ private: Future _targetHostAsync(const ShardId& shardId, const ReadPreferenceSetting& readPref); + /** + * Invoked every time a registered op context, handle or child scheduler is getting + * unregistered. Used to notify the 'all lists empty' condition variable when there is no more + * activity on a scheduler which has been shut down. + */ + void _notifyAllTasksComplete(WithLock); + // Service context under which this executor runs ServiceContext* const _serviceContext; @@ -177,6 +188,10 @@ private: // Any outstanding child schedulers created though 'makeChildScheduler' ChildIteratorsList _childSchedulers; + + // Notified when the the scheduler is shut down and the last active handle, operation context or + // child scheduler has been unregistered. + stdx::condition_variable _allListsEmptyCV; }; enum class ShouldStopIteration { kYes, kNo }; diff --git a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp index cf7a2f75e5a..4948c57d260 100644 --- a/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_futures_util_test.cpp @@ -597,6 +597,27 @@ TEST_F(AsyncWorkSchedulerTest, MakeChildSchedulerAfterShutdownParentScheduler) { ASSERT_THROWS_CODE(childFuture2.get(), AssertionException, ErrorCodes::InternalError); } +TEST_F(AsyncWorkSchedulerTest, ShutdownAllowedFromScheduleWorkAtCallback) { + AsyncWorkScheduler async(getServiceContext()); + auto future = async.scheduleWork([&](OperationContext* opCtx) { + async.shutdown({ErrorCodes::InternalError, "Test error"}); + }); + + future.get(); +} + +TEST_F(AsyncWorkSchedulerTest, DestroyingSchedulerCapturedInFutureCallback) { + auto async = std::make_unique(getServiceContext()); + + Barrier barrier(2); + auto future = + async->scheduleWork([&barrier](OperationContext* opCtx) { barrier.countDownAndWait(); }) + .tapAll([ async = std::move(async), &barrier ](Status){}); + + barrier.countDownAndWait(); + future.get(); +} + using DoWhileTest = AsyncWorkSchedulerTest; diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 8f1301873e4..e5b5e15e300 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -36,8 +36,6 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/write_concern.h" -#include "mongo/executor/task_executor.h" -#include "mongo/executor/task_executor_pool.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -69,6 +67,7 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, Date_t commitDeadline) { auto cas = _getCatalogAndScheduler(opCtx); auto& catalog = cas->catalog; + auto& scheduler = cas->scheduler; if (auto latestTxnNumAndCoordinator = catalog.getLatestOnSession(opCtx, lsid)) { auto latestCoordinator = latestTxnNumAndCoordinator->second; @@ -78,27 +77,14 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, latestCoordinator->cancelIfCommitNotYetStarted(); } - auto coordinator = - std::make_shared(opCtx->getServiceContext(), lsid, txnNumber); - - catalog.insert(opCtx, lsid, txnNumber, coordinator); - - // Schedule a task in the future to cancel the commit coordination on the coordinator, so that - // the coordinator does not remain in memory forever (in case the particpant list is never - // received). - const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto cbHandle = uassertStatusOK(executor->scheduleWorkAt( - commitDeadline, - [coordinatorWeakPtr = std::weak_ptr(coordinator)]( - const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable { - auto coordinator = coordinatorWeakPtr.lock(); - if (coordinator) { - coordinator->cancelIfCommitNotYetStarted(); - } - })); - - // TODO (SERVER-38715): Store the callback handle in the coordinator, so that the coordinator - // can cancel the cancel task on receiving the participant list. + catalog.insert(opCtx, + lsid, + txnNumber, + std::make_shared(opCtx->getServiceContext(), + lsid, + txnNumber, + scheduler.makeChildScheduler(), + commitDeadline)); } boost::optional> TransactionCoordinatorService::coordinateCommit( @@ -114,8 +100,7 @@ boost::optional> TransactionCoordinatorService::coor return boost::none; } - std::vector participants(participantList.begin(), participantList.end()); - auto decisionFuture = coordinator->runCommit(participants); + coordinator->runCommit(std::vector{participantList.begin(), participantList.end()}); return coordinator->onCompletion().then( [coordinator] { return coordinator->getDecision().get(); }); @@ -123,7 +108,7 @@ boost::optional> TransactionCoordinatorService::coor // TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision // is made durable. Currently the coordinator waits to hear acks because participants in prepare // reject requests with a higher transaction number, causing tests to fail. - // return coordinator->runCommit(participants); + // return coordinator->getDecision(); } boost::optional> TransactionCoordinatorService::recoverCommit( @@ -181,6 +166,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, << " transactions"; auto& catalog = catalogAndScheduler->catalog; + auto& scheduler = catalogAndScheduler->scheduler; for (const auto& doc : coordinatorDocs) { LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); @@ -188,8 +174,13 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, const auto lsid = *doc.getId().getSessionId(); const auto txnNumber = *doc.getId().getTxnNumber(); - auto coordinator = std::make_shared( - opCtx->getServiceContext(), lsid, txnNumber); + auto coordinator = + std::make_shared(opCtx->getServiceContext(), + lsid, + txnNumber, + scheduler.makeChildScheduler(), + boost::none /* No deadline */); + catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); coordinator->continueCommit(doc); } @@ -214,8 +205,7 @@ void TransactionCoordinatorService::onStepDown() { _catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler); } - _catalogAndSchedulerToCleanup->scheduler.shutdown( - {ErrorCodes::InterruptedDueToStepDown, "Transaction coordinator service stepping down"}); + _catalogAndSchedulerToCleanup->onStepDown(); } void TransactionCoordinatorService::onShardingInitialization(OperationContext* opCtx, @@ -253,9 +243,19 @@ void TransactionCoordinatorService::_joinPreviousRound() { // Block until all coordinators scheduled the previous time the service was primary to have // drained. Because the scheduler was interrupted, it should be extremely rare for there to be // any coordinators left, so if this actually causes blocking, it would most likely be a bug. - _catalogAndSchedulerToCleanup->recoveryTaskCompleted->wait(); - _catalogAndSchedulerToCleanup->catalog.join(); + _catalogAndSchedulerToCleanup->join(); _catalogAndSchedulerToCleanup.reset(); } +void TransactionCoordinatorService::CatalogAndScheduler::onStepDown() { + scheduler.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, + "Transaction coordinator service stepping down"}); + catalog.onStepDown(); +} + +void TransactionCoordinatorService::CatalogAndScheduler::join() { + recoveryTaskCompleted->wait(); + catalog.join(); +} + } // namespace mongo diff --git a/src/mongo/db/s/transaction_coordinator_service.h b/src/mongo/db/s/transaction_coordinator_service.h index d65751653c7..9cfdc68faab 100644 --- a/src/mongo/db/s/transaction_coordinator_service.h +++ b/src/mongo/db/s/transaction_coordinator_service.h @@ -106,6 +106,9 @@ private: struct CatalogAndScheduler { CatalogAndScheduler(ServiceContext* service) : scheduler(service) {} + void onStepDown(); + void join(); + txn::AsyncWorkScheduler scheduler; TransactionCoordinatorCatalog catalog; diff --git a/src/mongo/db/s/transaction_coordinator_service_test.cpp b/src/mongo/db/s/transaction_coordinator_service_test.cpp index 6f4eb769e2a..5bc2eec8087 100644 --- a/src/mongo/db/s/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_service_test.cpp @@ -65,9 +65,7 @@ const StatusWith kPrepareOkButWriteConcernError = << kDummyWriteConcernError); class TransactionCoordinatorServiceTestFixture : public TransactionCoordinatorTestFixture { -public: - // Prepare responses - +protected: void assertPrepareSentAndRespondWithSuccess() { assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kPrepareOk, @@ -145,9 +143,6 @@ public: ASSERT_FALSE(network()->hasReadyRequests()); } - // TODO (SERVER-38382): Put all these helpers in one separate file and share with - // transaction_coordinator_test. - /** * Goes through the steps to commit a transaction through the coordinator service for a given * lsid and txnNumber. Useful when not explictly testing the commit protocol. @@ -183,9 +178,8 @@ public: auto commitDecisionFuture = *coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet); - for (size_t i = 0; i < shardIdSet.size(); ++i) { - assertPrepareSentAndRespondWithNoSuchTransaction(); - } + // It is sufficient to abort just one of the participants + assertPrepareSentAndRespondWithNoSuchTransaction(); for (size_t i = 0; i < shardIdSet.size(); ++i) { assertAbortSentAndRespondWithSuccess(); @@ -390,11 +384,11 @@ TEST_F(TransactionCoordinatorServiceTest, operationContext(), _lsid, _txnNumber, kTwoShardIdSet); assertPrepareSentAndRespondWithNoSuchTransaction(); + advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered auto commitDecisionFuture2 = *coordinatorService->coordinateCommit( operationContext(), _lsid, _txnNumber, kTwoShardIdSet); - assertPrepareSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -433,11 +427,11 @@ TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationT operationContext(), _lsid, _txnNumber, kTwoShardIdSet); assertPrepareSentAndRespondWithNoSuchTransaction(); + advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered auto commitDecisionFuture2 = *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber); - assertPrepareSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -479,7 +473,6 @@ TEST_F(TransactionCoordinatorServiceTest, coordinatorService->createCoordinator( operationContext(), _lsid, _txnNumber + 1, kCommitDeadline); - ASSERT_EQ(static_cast(commitDecisionFuture.get()), static_cast(txn::CommitDecision::kCanceled)); } @@ -661,8 +654,8 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, operationContext(), _lsid, _txnNumber, kTwoShardIdSet); // Simulate a participant voting to abort. - assertPrepareSentAndRespondWithNoSuchTransaction(); - assertPrepareSentAndRespondWithSuccess(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }, + [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }}); assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -678,8 +671,6 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, operationContext(), _lsid, _txnNumber, kTwoShardIdSet); ASSERT_FALSE(commitDecisionFuture.isReady()); - // To prevent invariant failure in TransactionCoordinator that all futures have been completed. - abortTransaction(*coordinatorService(), _lsid, _txnNumber, kTwoShardIdSet, kTwoShardIdList[0]); } TEST_F(TransactionCoordinatorServiceTestSingleTxn, diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 6132e8e97b9..2f3d918886e 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -41,8 +41,6 @@ namespace mongo { namespace { -const Timestamp kDummyTimestamp = Timestamp::min(); -const Date_t kCommitDeadline = Date_t::max(); const StatusWith kNoSuchTransaction = BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction); const StatusWith kOk = BSON("ok" << 1); @@ -99,41 +97,6 @@ protected: ASSERT_FALSE(network()->hasReadyRequests()); } - /** - * Goes through the steps to commit a transaction through the coordinator service for a given - * lsid and txnNumber. Useful when not explictly testing the commit protocol. - */ - void commitTransaction(const std::set& transactionParticipantShards) { - for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { - assertPrepareSentAndRespondWithSuccess(kDummyPrepareTimestamp); - } - - for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { - assertCommitSentAndRespondWithSuccess(); - } - } - - /** - * Goes through the steps to abort a transaction through the coordinator service for a given - * lsid and txnNumber. Useful when not explictly testing the abort protocol. - */ - void abortTransaction(const std::set& shardIdSet, const ShardId& abortingShard) { - // auto commitDecisionFuture = - // coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, - // shardIdSet); - - for (size_t i = 0; i < shardIdSet.size(); ++i) { - assertPrepareSentAndRespondWithNoSuchTransaction(); - } - - // Abort gets sent to the second participant as soon as the first participant - // receives a not-okay response to prepare. - assertAbortSentAndRespondWithSuccess(); - - // Wait for abort to complete. - // commitDecisionFuture.get(); - } - LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; TxnNumber _txnNumber{1}; }; @@ -166,105 +129,134 @@ auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnN } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); - future.get(operationContext()); + + future.get(); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); - future.get(operationContext()); + + future.get(); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); }); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithSuccess(); + + future.get(); } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) { + txn::AsyncWorkScheduler aws(getServiceContext()); Future future = _driver->sendDecisionToParticipantShard( - kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](Status s) { ASSERT(s.isOK()); }); + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithNoSuchTransaction(); + + future.get(); } -TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) { - Future future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); +TEST_F(TransactionCoordinatorDriverTest, + SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future future = _driver->sendDecisionToParticipantShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + + assertPrepareSentAndRespondWithRetryableError(); + aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); + advanceClockAndExecuteScheduledTasks(); + + ASSERT_THROWS_CODE( + future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown); +} - std::move(future).getAsync([](StatusWith swResponse) { - ASSERT_OK(swResponse.getStatus()); - auto response = swResponse.getValue(); - ASSERT(response.vote == txn::PrepareVote::kCommit); - ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); - }); +TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnOkResponse) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + ASSERT(!future.isReady()); - // Simulate a participant voting to commit. assertPrepareSentAndRespondWithSuccess(); + + auto response = future.get(); + ASSERT(response.vote == txn::PrepareVote::kCommit); + ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecisionOnRetryableErrorThenOkResponse) { - Future future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - - std::move(future).getAsync([](StatusWith swResponse) { - ASSERT_OK(swResponse.getStatus()); - auto response = swResponse.getValue(); - ASSERT(response.vote == txn::PrepareVote::kCommit); - ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); - }); + txn::AsyncWorkScheduler aws(getServiceContext()); + Future future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); + ASSERT(!future.isReady()); + assertPrepareSentAndRespondWithSuccess(); + + auto response = future.get(); + ASSERT(response.vote == txn::PrepareVote::kCommit); + ASSERT(response.prepareTimestamp == kDummyPrepareTimestamp); } -TEST_F( - TransactionCoordinatorDriverTest, - SendPrepareToShardStopsRetryingAfterRetryableErrorAndReturnsNoneIfCoordinatorStateIsNotPrepare) { - Future future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); +TEST_F(TransactionCoordinatorDriverTest, + SendPrepareToShardCanBeInterruptedAndReturnsNoDecisionIfNotServiceShutdown) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); + + assertPrepareSentAndRespondWithRetryableError(); + aws.shutdown({ErrorCodes::TransactionCoordinatorReachedAbortDecision, "Retry interrupted"}); + advanceClockAndExecuteScheduledTasks(); + + auto response = future.get(); + ASSERT(response.vote == boost::none); + ASSERT(response.prepareTimestamp == boost::none); +} - auto resultFuture = std::move(future).then([](PrepareResponse response) { - ASSERT(response.vote == boost::none); - ASSERT(response.prepareTimestamp == boost::none); - }); +TEST_F(TransactionCoordinatorDriverTest, + SendPrepareToShardCanBeInterruptedAndThrowsExceptionIfServiceShutdown) { + txn::AsyncWorkScheduler aws(getServiceContext()); + Future future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); - _driver->cancel(); assertPrepareSentAndRespondWithRetryableError(); - resultFuture.get(); + aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"}); + advanceClockAndExecuteScheduledTasks(); + + ASSERT_THROWS_CODE( + future.get(), AssertionException, ErrorCodes::TransactionCoordinatorSteppingDown); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) { - auto future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)) - .then([&](PrepareResponse response) { - ASSERT(response.vote == txn::PrepareVote::kAbort); - ASSERT(response.prepareTimestamp == boost::none); - return response; - }); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -275,55 +267,52 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) { - auto future = - _driver->sendPrepareToShard(kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)) - .then([&](PrepareResponse response) { - ASSERT(response.vote == txn::PrepareVote::kAbort); - ASSERT(response.prepareTimestamp == boost::none); - }); + txn::AsyncWorkScheduler aws(getServiceContext()); + auto future = _driver->sendPrepareToShard( + aws, kTwoShardIdList[0], makeDummyPrepareCommand(_lsid, _txnNumber)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); - future.get(); + + auto response = future.get(); + ASSERT(response.vote == txn::PrepareVote::kAbort); + ASSERT(response.prepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); - assertPrepareSentAndRespondWithNoSuchTransaction(); - assertPrepareSentAndRespondWithSuccess(); - future.get(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kAbort); + ASSERT(response.maxPrepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithNoSuchTransaction(); - future.get(); + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kAbort); + ASSERT(response.maxPrepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) { - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kAbort); - ASSERT(response.maxPrepareTimestamp == boost::none); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); - assertPrepareSentAndRespondWithNoSuchTransaction(); - assertPrepareSentAndRespondWithNoSuchTransaction(); - future.get(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kAbort); + ASSERT(response.maxPrepareTimestamp == boost::none); } TEST_F(TransactionCoordinatorDriverTest, @@ -331,15 +320,14 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); - future.get(); // Should be able to return after the first participant responds. + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kCommit); + ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); } TEST_F(TransactionCoordinatorDriverTest, @@ -347,15 +335,14 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); - future.get(); // Should be able to return after the first participant responds. + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kCommit); + ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); } TEST_F(TransactionCoordinatorDriverTest, @@ -363,15 +350,14 @@ TEST_F(TransactionCoordinatorDriverTest, const auto firstPrepareTimestamp = Timestamp(1, 1); const auto maxPrepareTimestamp = Timestamp(2, 1); - auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber) - .then([&](txn::PrepareVoteConsensus response) { - ASSERT(response.decision == txn::CommitDecision::kCommit); - ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); - }); + auto future = _driver->sendPrepare(kTwoShardIdList, _lsid, _txnNumber); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); - future.get(); // Should be able to return after the first participant responds. + + auto response = future.get(); + ASSERT(response.decision == txn::CommitDecision::kCommit); + ASSERT(response.maxPrepareTimestamp == maxPrepareTimestamp); } @@ -660,13 +646,40 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, using TransactionCoordinatorTest = TransactionCoordinatorTestBase; -TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); +TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitResponses) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); - // Simulate a participant voting to abort. - assertPrepareSentAndRespondWithNoSuchTransaction(); assertPrepareSentAndRespondWithSuccess(); + assertPrepareSentAndRespondWithSuccess(); + + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + + auto commitDecision = commitDecisionFuture.get(); + ASSERT_EQ(static_cast(commitDecision), static_cast(txn::CommitDecision::kCommit)); + + coordinator.onCompletion().get(); +} + +TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommitResponses) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); + + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -677,33 +690,40 @@ TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnAbort) coordinator.onCompletion().get(); } -TEST_F(TransactionCoordinatorTest, RunCommitReturnsCorrectCommitDecisionOnCommit) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); +TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnCommitAndAbortResponses) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); - assertPrepareSentAndRespondWithSuccess(); - assertPrepareSentAndRespondWithSuccess(); + onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }, + [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }}); - assertCommitSentAndRespondWithSuccess(); - assertCommitSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); auto commitDecision = commitDecisionFuture.get(); - ASSERT_EQ(static_cast(commitDecision), static_cast(txn::CommitDecision::kCommit)); + ASSERT_EQ(static_cast(commitDecision), static_cast(txn::CommitDecision::kAbort)); coordinator.onCompletion().get(); } -TEST_F(TransactionCoordinatorTest, - RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesOneParticipantAborts) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); +TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortResponseOnly) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); - // One participant votes abort after retry. - assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); - - // One participant votes commit. - assertPrepareSentAndRespondWithSuccess(); + advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -715,16 +735,48 @@ TEST_F(TransactionCoordinatorTest, } TEST_F(TransactionCoordinatorTest, - RunCommitReturnsCorrectCommitDecisionOnAbortAfterNetworkRetriesBothParticipantsAbort) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); + RunCommitProducesAbortDecisionOnOneCommitResponseAndOneAbortResponseAfterRetry) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); + + // One participant votes commit and other encounters retryable error + onCommands({[&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }, + [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }}); + advanceClockAndExecuteScheduledTasks(); // Make sure the scheduled retry executes // One participant votes abort after retry. - assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); - // One participant votes abort. - assertPrepareSentAndRespondWithNoSuchTransaction(); + assertAbortSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + + auto commitDecision = commitDecisionFuture.get(); + ASSERT_EQ(static_cast(commitDecision), static_cast(txn::CommitDecision::kAbort)); + + coordinator.onCompletion().get(); +} + +TEST_F(TransactionCoordinatorTest, + RunCommitProducesAbortDecisionOnOneAbortResponseAndOneRetryableAbortResponse) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); + + // One participant votes abort and other encounters retryable error + onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, + [&](const executor::RemoteCommandRequest& request) { return kRetryableError; }}); + advanceClockAndExecuteScheduledTasks(); // Make sure the cancellation callback is delivered assertAbortSentAndRespondWithSuccess(); assertAbortSentAndRespondWithSuccess(); @@ -736,9 +788,15 @@ TEST_F(TransactionCoordinatorTest, } TEST_F(TransactionCoordinatorTest, - RunCommitReturnsCorrectCommitDecisionOnCommitAfterNetworkRetries) { - TransactionCoordinator coordinator(getServiceContext(), _lsid, _txnNumber); - auto commitDecisionFuture = coordinator.runCommit(kTwoShardIdList); + RunCommitProducesCommitDecisionOnCommitAfterMultipleNetworkRetries) { + TransactionCoordinator coordinator( + getServiceContext(), + _lsid, + _txnNumber, + std::make_unique(getServiceContext()), + boost::none); + coordinator.runCommit(kTwoShardIdList); + auto commitDecisionFuture = coordinator.getDecision(); // One participant votes commit after retry. assertPrepareSentAndRespondWithRetryableError(); diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp index 6f6df8fe73e..d7384e906ae 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.cpp +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.cpp @@ -101,7 +101,7 @@ void TransactionCoordinatorTestFixture::assertCommandSentAndRespondWith( const StatusWith& response, boost::optional expectedWriteConcern) { onCommand([&](const executor::RemoteCommandRequest& request) { - ASSERT_EQ(request.cmdObj.firstElement().fieldNameStringData(), commandName); + ASSERT_EQ(commandName, request.cmdObj.firstElement().fieldNameStringData()); if (expectedWriteConcern) { ASSERT_BSONOBJ_EQ( *expectedWriteConcern, -- cgit v1.2.1