diff options
Diffstat (limited to 'src/mongo/db/transaction_coordinator_catalog.cpp')
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.cpp | 63 |
1 files changed, 30 insertions, 33 deletions
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp index 460d450d4b1..8ebc71726a1 100644 --- a/src/mongo/db/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/transaction_coordinator_catalog.cpp @@ -45,7 +45,9 @@ MONGO_FAIL_POINT_DEFINE(doNotForgetCoordinator); TransactionCoordinatorCatalog::TransactionCoordinatorCatalog() = default; -TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() = default; +TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() { + join(); +} void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, LogicalSessionId lsid, @@ -68,15 +70,8 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, "the same session ID and transaction number as a previous coordinator"); // Schedule callback to remove coordinator from catalog when it either commits or aborts. - coordinator->onCompletion().getAsync([ - catalogWeakPtr = std::weak_ptr<TransactionCoordinatorCatalog>(shared_from_this()), - lsid, - txnNumber - ](Status) { - if (auto catalog = catalogWeakPtr.lock()) { - catalog->remove(lsid, txnNumber); - } - }); + coordinator->onCompletion().getAsync( + [this, lsid, txnNumber](Status) { remove(lsid, txnNumber); }); LOG(3) << "Inserting coordinator for transaction " << txnNumber << " on session " << lsid.toBSON() << " into in-memory catalog"; @@ -170,7 +165,7 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN } coordinatorsForSession.erase(coordinatorForTxnIter); - if (coordinatorsForSession.size() == 0) { + if (coordinatorsForSession.empty()) { _coordinatorsBySession.erase(coordinatorsForSessionIter); } } @@ -182,41 +177,43 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN } } -void TransactionCoordinatorCatalog::enterStepUp(OperationContext* opCtx) { - stdx::unique_lock<stdx::mutex> lk(_mutex); +void TransactionCoordinatorCatalog::exitStepUp(Status status) { + if (status.isOK()) { + LOG(0) << "Incoming coordinateCommit requests are now enabled"; + } else { + warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed" + << causedBy(status); + } - // If this node stepped down and stepped back up, the asynchronous stepUp task from the previous - // stepUp may still be running, so wait for the previous stepUp task to complete. - LOG(3) << "Waiting for coordinator stepup task from previous term, if any, to complete"; - _waitForStepUpToComplete(lk, opCtx); + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_stepUpCompletionStatus); + _stepUpCompletionStatus = std::move(status); + _stepUpCompleteCv.notify_all(); +} - _stepUpInProgress = true; +void TransactionCoordinatorCatalog::join() { + stdx::unique_lock<stdx::mutex> ul(_mutex); - LOG(3) << "Waiting for there to be no active coordinators; current coordinator catalog: " - << this->_toString(lk); - opCtx->waitForConditionOrInterrupt( - _noActiveCoordinatorsCv, lk, [this]() { return _coordinatorsBySession.empty(); }); + while (!_noActiveCoordinatorsCv.wait_for( + ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) { + LOG(0) << "After 5 seconds of wait there are still " << _coordinatorsBySession.size() + << " sessions left with active coordinators which have not yet completed"; + LOG(0) << _toString(ul); + } } -void TransactionCoordinatorCatalog::exitStepUp() { +std::string TransactionCoordinatorCatalog::toString() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_stepUpInProgress); - - LOG(3) << "Signaling stepup complete"; - _stepUpInProgress = false; - _noStepUpInProgressCv.notify_all(); + return _toString(lk); } void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk, OperationContext* opCtx) { invariant(lk.owns_lock()); opCtx->waitForConditionOrInterrupt( - _noStepUpInProgressCv, lk, [this]() { return !_stepUpInProgress; }); -} + _stepUpCompleteCv, lk, [this]() { return bool(_stepUpCompletionStatus); }); -std::string TransactionCoordinatorCatalog::toString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _toString(lk); + uassertStatusOK(*_stepUpCompletionStatus); } std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { |