summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_coordinator_catalog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_coordinator_catalog.cpp')
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.cpp63
1 files changed, 30 insertions, 33 deletions
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp
index 460d450d4b1..8ebc71726a1 100644
--- a/src/mongo/db/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog.cpp
@@ -45,7 +45,9 @@ MONGO_FAIL_POINT_DEFINE(doNotForgetCoordinator);
TransactionCoordinatorCatalog::TransactionCoordinatorCatalog() = default;
-TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() = default;
+TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() {
+ join();
+}
void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
LogicalSessionId lsid,
@@ -68,15 +70,8 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
"the same session ID and transaction number as a previous coordinator");
// Schedule callback to remove coordinator from catalog when it either commits or aborts.
- coordinator->onCompletion().getAsync([
- catalogWeakPtr = std::weak_ptr<TransactionCoordinatorCatalog>(shared_from_this()),
- lsid,
- txnNumber
- ](Status) {
- if (auto catalog = catalogWeakPtr.lock()) {
- catalog->remove(lsid, txnNumber);
- }
- });
+ coordinator->onCompletion().getAsync(
+ [this, lsid, txnNumber](Status) { remove(lsid, txnNumber); });
LOG(3) << "Inserting coordinator for transaction " << txnNumber << " on session "
<< lsid.toBSON() << " into in-memory catalog";
@@ -170,7 +165,7 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
}
coordinatorsForSession.erase(coordinatorForTxnIter);
- if (coordinatorsForSession.size() == 0) {
+ if (coordinatorsForSession.empty()) {
_coordinatorsBySession.erase(coordinatorsForSessionIter);
}
}
@@ -182,41 +177,43 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
}
}
-void TransactionCoordinatorCatalog::enterStepUp(OperationContext* opCtx) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+void TransactionCoordinatorCatalog::exitStepUp(Status status) {
+ if (status.isOK()) {
+ LOG(0) << "Incoming coordinateCommit requests are now enabled";
+ } else {
+ warning() << "Coordinator recovery failed and coordinateCommit requests will not be allowed"
+ << causedBy(status);
+ }
- // If this node stepped down and stepped back up, the asynchronous stepUp task from the previous
- // stepUp may still be running, so wait for the previous stepUp task to complete.
- LOG(3) << "Waiting for coordinator stepup task from previous term, if any, to complete";
- _waitForStepUpToComplete(lk, opCtx);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_stepUpCompletionStatus);
+ _stepUpCompletionStatus = std::move(status);
+ _stepUpCompleteCv.notify_all();
+}
- _stepUpInProgress = true;
+void TransactionCoordinatorCatalog::join() {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
- LOG(3) << "Waiting for there to be no active coordinators; current coordinator catalog: "
- << this->_toString(lk);
- opCtx->waitForConditionOrInterrupt(
- _noActiveCoordinatorsCv, lk, [this]() { return _coordinatorsBySession.empty(); });
+ while (!_noActiveCoordinatorsCv.wait_for(
+ ul, stdx::chrono::seconds{5}, [this] { return _coordinatorsBySession.empty(); })) {
+ LOG(0) << "After 5 seconds of wait there are still " << _coordinatorsBySession.size()
+ << " sessions left with active coordinators which have not yet completed";
+ LOG(0) << _toString(ul);
+ }
}
-void TransactionCoordinatorCatalog::exitStepUp() {
+std::string TransactionCoordinatorCatalog::toString() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- invariant(_stepUpInProgress);
-
- LOG(3) << "Signaling stepup complete";
- _stepUpInProgress = false;
- _noStepUpInProgressCv.notify_all();
+ return _toString(lk);
}
void TransactionCoordinatorCatalog::_waitForStepUpToComplete(stdx::unique_lock<stdx::mutex>& lk,
OperationContext* opCtx) {
invariant(lk.owns_lock());
opCtx->waitForConditionOrInterrupt(
- _noStepUpInProgressCv, lk, [this]() { return !_stepUpInProgress; });
-}
+ _stepUpCompleteCv, lk, [this]() { return bool(_stepUpCompletionStatus); });
-std::string TransactionCoordinatorCatalog::toString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _toString(lk);
+ uassertStatusOK(*_stepUpCompletionStatus);
}
std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const {