diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-10-05 14:55:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-13 18:05:33 +0000 |
commit | e4df5df45864c9ab39dfa966d11c9f4cae9eea1e (patch) | |
tree | 4a9a506235d7aebb6c7589720f6e01e528c78946 /src/mongo/db/s/transaction_coordinator_catalog.cpp | |
parent | b9e5823d950496f1ae3945f00cb4c76dabc24e9e (diff) | |
download | mongo-e4df5df45864c9ab39dfa966d11c9f4cae9eea1e.tar.gz |
SERVER-60323 Make TransactionCoordinatorCatalog support txnRetryCounter
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_catalog.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog.cpp | 152 |
1 files changed, 104 insertions, 48 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp index 15721c46991..83b9d569477 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp @@ -67,8 +67,10 @@ void TransactionCoordinatorCatalog::onStepDown() { std::vector<std::shared_ptr<TransactionCoordinator>> coordinatorsToCancel; for (auto&& [sessionId, coordinatorsForSession] : _coordinatorsBySession) { - for (auto&& [txnNumber, coordinator] : coordinatorsForSession) { - coordinatorsToCancel.emplace_back(coordinator); + for (auto&& [txnNumber, coordinatorsForTxnNumber] : coordinatorsForSession) { + for (auto&& [txnRetryCounter, coordinator] : coordinatorsForTxnNumber) { + coordinatorsToCancel.emplace_back(coordinator); + } } } @@ -81,15 +83,19 @@ void TransactionCoordinatorCatalog::onStepDown() { void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, std::shared_ptr<TransactionCoordinator> coordinator, bool forStepUp) { - LOGV2_DEBUG(22439, - 3, - "{sessionId}:{txnNumber} Inserting coordinator into in-memory catalog", - "Inserting coordinator into in-memory catalog", - "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); + LOGV2_DEBUG( + 22439, + 3, + "{sessionId}:{txnNumberAndRetryCounter} Inserting coordinator into in-memory catalog", + "Inserting coordinator into in-memory catalog", + "sessionId"_attr = lsid.getId(), + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); + + auto txnNumber = txnNumberAndRetryCounter.getTxnNumber(); + auto txnRetryCounter = *txnNumberAndRetryCounter.getTxnRetryCounter(); stdx::unique_lock<Latch> ul(_mutex); if (!forStepUp) { @@ -99,13 +105,33 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, auto& coordinatorsBySession = _coordinatorsBySession[lsid]; // We should never try to insert a coordinator if one already exists for this session and txn - // number. Logic for avoiding this due to e.g. malformed commands should be handled external to - // the catalog. - invariant(coordinatorsBySession.find(txnNumber) == coordinatorsBySession.end(), - "Cannot insert a TransactionCoordinator into the TransactionCoordinatorCatalog with " - "the same session ID and transaction number as a previous coordinator"); + // number and transaction retry counter. Logic for avoiding this due to e.g. malformed commands + // should be handled external to the catalog. + if (coordinatorsBySession.find(txnNumber) != coordinatorsBySession.end()) { + auto coordinatorByTxnNumber = coordinatorsBySession[txnNumber]; + invariant( + coordinatorByTxnNumber.find(txnRetryCounter) == coordinatorByTxnNumber.end(), + "Cannot insert a TransactionCoordinator into the TransactionCoordinatorCatalog with " + "the same session ID, transaction number and retry counter as a previous coordinator"); + + const auto latestCoordinatorOnSessionIter = coordinatorByTxnNumber.begin(); + + const auto latestTxnRetryCounter = latestCoordinatorOnSessionIter->first; + invariant(latestTxnRetryCounter < txnRetryCounter); + + const auto& latestCoordinatorOnSession = latestCoordinatorOnSessionIter->second; + if (latestCoordinatorOnSession->getDecision().isReady()) { + auto swDecision = latestCoordinatorOnSession->getDecision().getNoThrow(); + invariant(!swDecision.isOK(), + str::stream() << "Cannot insert a TransactionCoordinator into the " + "TransactionCoordinatorCatalog with the same session ID and " + "transaction number as a previous coordinator (transaction " + "retry counter " + << latestTxnRetryCounter << ") that is already committed"); + } + } - coordinatorsBySession[txnNumber] = coordinator; + coordinatorsBySession[txnNumber][txnRetryCounter] = coordinator; // Schedule callback to remove the coordinator from the catalog when all its activities have // completed. This needs to be done outside of the mutex, in case the coordinator completed @@ -116,11 +142,18 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx, coordinator->onCompletion() .thenRunOn(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) .ignoreValue() - .getAsync([this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); }); + .getAsync([this, lsid, txnNumberAndRetryCounter](Status) { + _remove(lsid, txnNumberAndRetryCounter); + }); } std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get( - OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) { + OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { + auto txnNumber = txnNumberAndRetryCounter.getTxnNumber(); + auto txnRetryCounter = *txnNumberAndRetryCounter.getTxnRetryCounter(); + stdx::unique_lock<Latch> ul(_mutex); _waitForStepUpToComplete(ul, opCtx); @@ -129,16 +162,20 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get( auto coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); if (coordinatorsForSessionIter != _coordinatorsBySession.end()) { const auto& coordinatorsForSession = coordinatorsForSessionIter->second; - auto coordinatorForTxnIter = coordinatorsForSession.find(txnNumber); - if (coordinatorForTxnIter != coordinatorsForSession.end()) { - coordinatorToReturn = coordinatorForTxnIter->second; + auto coordinatorForTxnNumberIter = coordinatorsForSession.find(txnNumber); + if (coordinatorForTxnNumberIter != coordinatorsForSession.end()) { + const auto& coordinatorsForTxnNumber = coordinatorForTxnNumberIter->second; + auto coordinatorForTxnRetryCounterIter = coordinatorsForTxnNumber.find(txnRetryCounter); + if (coordinatorForTxnRetryCounterIter != coordinatorsForTxnNumber.end()) { + coordinatorToReturn = coordinatorForTxnRetryCounterIter->second; + } } } return coordinatorToReturn; } -boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>> +boost::optional<std::pair<TxnNumberAndRetryCounter, std::shared_ptr<TransactionCoordinator>>> TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, const LogicalSessionId& lsid) { stdx::unique_lock<Latch> ul(_mutex); @@ -156,17 +193,33 @@ TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx, // transactions are removed invariant(!coordinatorsForSession.empty()); - const auto& lastCoordinatorOnSession = coordinatorsForSession.begin(); - return std::make_pair(lastCoordinatorOnSession->first, lastCoordinatorOnSession->second); + const auto& coordinatorsForLastTxnNumberIter = coordinatorsForSession.begin(); + + auto lastTxnNumber = coordinatorsForLastTxnNumberIter->first; + const auto& coordinatorsForLastTxnNumber = coordinatorsForLastTxnNumberIter->second; + + // We should never have empty map for a txnNumber because entries for txnNumbers with no + // coordinators are removed. + invariant(!coordinatorsForLastTxnNumber.empty()); + + const auto& coordinatorsForLastTxnRetryCounterIter = coordinatorsForLastTxnNumber.begin(); + auto lastTxnRetryCounter = coordinatorsForLastTxnRetryCounterIter->first; + return std::make_pair(TxnNumberAndRetryCounter{lastTxnNumber, lastTxnRetryCounter}, + coordinatorsForLastTxnRetryCounterIter->second); } -void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNumber txnNumber) { - LOGV2_DEBUG(22440, - 3, - "{sessionId}:{txnNumber} Removing coordinator from in-memory catalog", - "Removing coordinator from in-memory catalog", - "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); +void TransactionCoordinatorCatalog::_remove( + const LogicalSessionId& lsid, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { + LOGV2_DEBUG( + 22440, + 3, + "{sessionId}:{txnNumberAndRetryCounter} Removing coordinator from in-memory catalog", + "Removing coordinator from in-memory catalog", + "sessionId"_attr = lsid.getId(), + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); + + auto txnNumber = txnNumberAndRetryCounter.getTxnNumber(); + auto txnRetryCounter = *txnNumberAndRetryCounter.getTxnRetryCounter(); stdx::lock_guard<Latch> lk(_mutex); @@ -174,14 +227,21 @@ void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNum if (coordinatorsForSessionIter != _coordinatorsBySession.end()) { auto& coordinatorsForSession = coordinatorsForSessionIter->second; - const auto& coordinatorForTxnIter = coordinatorsForSession.find(txnNumber); - - if (coordinatorForTxnIter != coordinatorsForSession.end()) { - auto coordinator = coordinatorForTxnIter->second; - - coordinatorsForSession.erase(coordinatorForTxnIter); - if (coordinatorsForSession.empty()) { - _coordinatorsBySession.erase(coordinatorsForSessionIter); + const auto& coordinatorForTxnNumberIter = coordinatorsForSession.find(txnNumber); + + if (coordinatorForTxnNumberIter != coordinatorsForSession.end()) { + auto& coordinatorsForTxnNumber = coordinatorForTxnNumberIter->second; + const auto& coordinatorForTxnRetryCounterIter = + coordinatorsForTxnNumber.find(txnRetryCounter); + + if (coordinatorForTxnRetryCounterIter != coordinatorsForTxnNumber.end()) { + coordinatorsForTxnNumber.erase(coordinatorForTxnRetryCounterIter); + if (coordinatorsForTxnNumber.empty()) { + coordinatorsForSession.erase(coordinatorForTxnNumberIter); + if (coordinatorsForSession.empty()) { + _coordinatorsBySession.erase(coordinatorsForSessionIter); + } + } } } } @@ -239,16 +299,12 @@ std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const { void TransactionCoordinatorCatalog::filter(FilterPredicate predicate, FilterVisitor visitor) { stdx::lock_guard<Latch> lk(_mutex); - for (auto sessionIt = _coordinatorsBySession.begin(); sessionIt != _coordinatorsBySession.end(); - ++sessionIt) { - auto& lsid = sessionIt->first; - auto& coordinatorsByTxnNumber = sessionIt->second; - for (auto txnIt = coordinatorsByTxnNumber.begin(); txnIt != coordinatorsByTxnNumber.end(); - ++txnIt) { - auto txnNumber = txnIt->first; - auto& transactionCoordinator = txnIt->second; - if (predicate(lsid, txnNumber, transactionCoordinator)) { - visitor(lsid, txnNumber, transactionCoordinator); + for (auto&& [sessionId, coordinatorsForSession] : _coordinatorsBySession) { + for (auto&& [txnNumber, coordinatorsForTxnNumber] : coordinatorsForSession) { + for (auto&& [txnRetryCounter, coordinator] : coordinatorsForTxnNumber) { + if (predicate(sessionId, txnNumber, coordinator)) { + visitor(sessionId, txnNumber, coordinator); + } } } } |