summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator_catalog.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-10-05 14:55:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-13 18:05:33 +0000
commite4df5df45864c9ab39dfa966d11c9f4cae9eea1e (patch)
tree4a9a506235d7aebb6c7589720f6e01e528c78946 /src/mongo/db/s/transaction_coordinator_catalog.cpp
parentb9e5823d950496f1ae3945f00cb4c76dabc24e9e (diff)
downloadmongo-e4df5df45864c9ab39dfa966d11c9f4cae9eea1e.tar.gz
SERVER-60323 Make TransactionCoordinatorCatalog support txnRetryCounter
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_catalog.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator_catalog.cpp152
1 files changed, 104 insertions, 48 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_catalog.cpp b/src/mongo/db/s/transaction_coordinator_catalog.cpp
index 15721c46991..83b9d569477 100644
--- a/src/mongo/db/s/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/s/transaction_coordinator_catalog.cpp
@@ -67,8 +67,10 @@ void TransactionCoordinatorCatalog::onStepDown() {
std::vector<std::shared_ptr<TransactionCoordinator>> coordinatorsToCancel;
for (auto&& [sessionId, coordinatorsForSession] : _coordinatorsBySession) {
- for (auto&& [txnNumber, coordinator] : coordinatorsForSession) {
- coordinatorsToCancel.emplace_back(coordinator);
+ for (auto&& [txnNumber, coordinatorsForTxnNumber] : coordinatorsForSession) {
+ for (auto&& [txnRetryCounter, coordinator] : coordinatorsForTxnNumber) {
+ coordinatorsToCancel.emplace_back(coordinator);
+ }
}
}
@@ -81,15 +83,19 @@ void TransactionCoordinatorCatalog::onStepDown() {
void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
const LogicalSessionId& lsid,
- TxnNumber txnNumber,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
std::shared_ptr<TransactionCoordinator> coordinator,
bool forStepUp) {
- LOGV2_DEBUG(22439,
- 3,
- "{sessionId}:{txnNumber} Inserting coordinator into in-memory catalog",
- "Inserting coordinator into in-memory catalog",
- "sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+ LOGV2_DEBUG(
+ 22439,
+ 3,
+ "{sessionId}:{txnNumberAndRetryCounter} Inserting coordinator into in-memory catalog",
+ "Inserting coordinator into in-memory catalog",
+ "sessionId"_attr = lsid.getId(),
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
+
+ auto txnNumber = txnNumberAndRetryCounter.getTxnNumber();
+ auto txnRetryCounter = *txnNumberAndRetryCounter.getTxnRetryCounter();
stdx::unique_lock<Latch> ul(_mutex);
if (!forStepUp) {
@@ -99,13 +105,33 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
auto& coordinatorsBySession = _coordinatorsBySession[lsid];
// We should never try to insert a coordinator if one already exists for this session and txn
- // number. Logic for avoiding this due to e.g. malformed commands should be handled external to
- // the catalog.
- invariant(coordinatorsBySession.find(txnNumber) == coordinatorsBySession.end(),
- "Cannot insert a TransactionCoordinator into the TransactionCoordinatorCatalog with "
- "the same session ID and transaction number as a previous coordinator");
+ // number and transaction retry counter. Logic for avoiding this due to e.g. malformed commands
+ // should be handled external to the catalog.
+ if (coordinatorsBySession.find(txnNumber) != coordinatorsBySession.end()) {
+ auto coordinatorByTxnNumber = coordinatorsBySession[txnNumber];
+ invariant(
+ coordinatorByTxnNumber.find(txnRetryCounter) == coordinatorByTxnNumber.end(),
+ "Cannot insert a TransactionCoordinator into the TransactionCoordinatorCatalog with "
+ "the same session ID, transaction number and retry counter as a previous coordinator");
+
+ const auto latestCoordinatorOnSessionIter = coordinatorByTxnNumber.begin();
+
+ const auto latestTxnRetryCounter = latestCoordinatorOnSessionIter->first;
+ invariant(latestTxnRetryCounter < txnRetryCounter);
+
+ const auto& latestCoordinatorOnSession = latestCoordinatorOnSessionIter->second;
+ if (latestCoordinatorOnSession->getDecision().isReady()) {
+ auto swDecision = latestCoordinatorOnSession->getDecision().getNoThrow();
+ invariant(!swDecision.isOK(),
+ str::stream() << "Cannot insert a TransactionCoordinator into the "
+ "TransactionCoordinatorCatalog with the same session ID and "
+ "transaction number as a previous coordinator (transaction "
+ "retry counter "
+ << latestTxnRetryCounter << ") that is already committed");
+ }
+ }
- coordinatorsBySession[txnNumber] = coordinator;
+ coordinatorsBySession[txnNumber][txnRetryCounter] = coordinator;
// Schedule callback to remove the coordinator from the catalog when all its activities have
// completed. This needs to be done outside of the mutex, in case the coordinator completed
@@ -116,11 +142,18 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
coordinator->onCompletion()
.thenRunOn(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
.ignoreValue()
- .getAsync([this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); });
+ .getAsync([this, lsid, txnNumberAndRetryCounter](Status) {
+ _remove(lsid, txnNumberAndRetryCounter);
+ });
}
std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(
- OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) {
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
+ auto txnNumber = txnNumberAndRetryCounter.getTxnNumber();
+ auto txnRetryCounter = *txnNumberAndRetryCounter.getTxnRetryCounter();
+
stdx::unique_lock<Latch> ul(_mutex);
_waitForStepUpToComplete(ul, opCtx);
@@ -129,16 +162,20 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(
auto coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
if (coordinatorsForSessionIter != _coordinatorsBySession.end()) {
const auto& coordinatorsForSession = coordinatorsForSessionIter->second;
- auto coordinatorForTxnIter = coordinatorsForSession.find(txnNumber);
- if (coordinatorForTxnIter != coordinatorsForSession.end()) {
- coordinatorToReturn = coordinatorForTxnIter->second;
+ auto coordinatorForTxnNumberIter = coordinatorsForSession.find(txnNumber);
+ if (coordinatorForTxnNumberIter != coordinatorsForSession.end()) {
+ const auto& coordinatorsForTxnNumber = coordinatorForTxnNumberIter->second;
+ auto coordinatorForTxnRetryCounterIter = coordinatorsForTxnNumber.find(txnRetryCounter);
+ if (coordinatorForTxnRetryCounterIter != coordinatorsForTxnNumber.end()) {
+ coordinatorToReturn = coordinatorForTxnRetryCounterIter->second;
+ }
}
}
return coordinatorToReturn;
}
-boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
+boost::optional<std::pair<TxnNumberAndRetryCounter, std::shared_ptr<TransactionCoordinator>>>
TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx,
const LogicalSessionId& lsid) {
stdx::unique_lock<Latch> ul(_mutex);
@@ -156,17 +193,33 @@ TransactionCoordinatorCatalog::getLatestOnSession(OperationContext* opCtx,
// transactions are removed
invariant(!coordinatorsForSession.empty());
- const auto& lastCoordinatorOnSession = coordinatorsForSession.begin();
- return std::make_pair(lastCoordinatorOnSession->first, lastCoordinatorOnSession->second);
+ const auto& coordinatorsForLastTxnNumberIter = coordinatorsForSession.begin();
+
+ auto lastTxnNumber = coordinatorsForLastTxnNumberIter->first;
+ const auto& coordinatorsForLastTxnNumber = coordinatorsForLastTxnNumberIter->second;
+
+ // We should never have empty map for a txnNumber because entries for txnNumbers with no
+ // coordinators are removed.
+ invariant(!coordinatorsForLastTxnNumber.empty());
+
+ const auto& coordinatorsForLastTxnRetryCounterIter = coordinatorsForLastTxnNumber.begin();
+ auto lastTxnRetryCounter = coordinatorsForLastTxnRetryCounterIter->first;
+ return std::make_pair(TxnNumberAndRetryCounter{lastTxnNumber, lastTxnRetryCounter},
+ coordinatorsForLastTxnRetryCounterIter->second);
}
-void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNumber txnNumber) {
- LOGV2_DEBUG(22440,
- 3,
- "{sessionId}:{txnNumber} Removing coordinator from in-memory catalog",
- "Removing coordinator from in-memory catalog",
- "sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber);
+void TransactionCoordinatorCatalog::_remove(
+ const LogicalSessionId& lsid, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
+ LOGV2_DEBUG(
+ 22440,
+ 3,
+ "{sessionId}:{txnNumberAndRetryCounter} Removing coordinator from in-memory catalog",
+ "Removing coordinator from in-memory catalog",
+ "sessionId"_attr = lsid.getId(),
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter);
+
+ auto txnNumber = txnNumberAndRetryCounter.getTxnNumber();
+ auto txnRetryCounter = *txnNumberAndRetryCounter.getTxnRetryCounter();
stdx::lock_guard<Latch> lk(_mutex);
@@ -174,14 +227,21 @@ void TransactionCoordinatorCatalog::_remove(const LogicalSessionId& lsid, TxnNum
if (coordinatorsForSessionIter != _coordinatorsBySession.end()) {
auto& coordinatorsForSession = coordinatorsForSessionIter->second;
- const auto& coordinatorForTxnIter = coordinatorsForSession.find(txnNumber);
-
- if (coordinatorForTxnIter != coordinatorsForSession.end()) {
- auto coordinator = coordinatorForTxnIter->second;
-
- coordinatorsForSession.erase(coordinatorForTxnIter);
- if (coordinatorsForSession.empty()) {
- _coordinatorsBySession.erase(coordinatorsForSessionIter);
+ const auto& coordinatorForTxnNumberIter = coordinatorsForSession.find(txnNumber);
+
+ if (coordinatorForTxnNumberIter != coordinatorsForSession.end()) {
+ auto& coordinatorsForTxnNumber = coordinatorForTxnNumberIter->second;
+ const auto& coordinatorForTxnRetryCounterIter =
+ coordinatorsForTxnNumber.find(txnRetryCounter);
+
+ if (coordinatorForTxnRetryCounterIter != coordinatorsForTxnNumber.end()) {
+ coordinatorsForTxnNumber.erase(coordinatorForTxnRetryCounterIter);
+ if (coordinatorsForTxnNumber.empty()) {
+ coordinatorsForSession.erase(coordinatorForTxnNumberIter);
+ if (coordinatorsForSession.empty()) {
+ _coordinatorsBySession.erase(coordinatorsForSessionIter);
+ }
+ }
}
}
}
@@ -239,16 +299,12 @@ std::string TransactionCoordinatorCatalog::_toString(WithLock wl) const {
void TransactionCoordinatorCatalog::filter(FilterPredicate predicate, FilterVisitor visitor) {
stdx::lock_guard<Latch> lk(_mutex);
- for (auto sessionIt = _coordinatorsBySession.begin(); sessionIt != _coordinatorsBySession.end();
- ++sessionIt) {
- auto& lsid = sessionIt->first;
- auto& coordinatorsByTxnNumber = sessionIt->second;
- for (auto txnIt = coordinatorsByTxnNumber.begin(); txnIt != coordinatorsByTxnNumber.end();
- ++txnIt) {
- auto txnNumber = txnIt->first;
- auto& transactionCoordinator = txnIt->second;
- if (predicate(lsid, txnNumber, transactionCoordinator)) {
- visitor(lsid, txnNumber, transactionCoordinator);
+ for (auto&& [sessionId, coordinatorsForSession] : _coordinatorsBySession) {
+ for (auto&& [txnNumber, coordinatorsForTxnNumber] : coordinatorsForSession) {
+ for (auto&& [txnRetryCounter, coordinator] : coordinatorsForTxnNumber) {
+ if (predicate(sessionId, txnNumber, coordinator)) {
+ visitor(sessionId, txnNumber, coordinator);
+ }
}
}
}