diff options
Diffstat (limited to 'src/mongo/db/catalog/collection_catalog.cpp')
-rw-r--r-- | src/mongo/db/catalog/collection_catalog.cpp | 280 |
1 files changed, 258 insertions, 22 deletions
diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp index 26c630537a8..3a7e255f464 100644 --- a/src/mongo/db/catalog/collection_catalog.cpp +++ b/src/mongo/db/catalog/collection_catalog.cpp @@ -154,22 +154,29 @@ public: break; } case UncommittedCatalogUpdates::Entry::Action::kRenamedCollection: { - writeJobs.push_back([opCtx, &from = entry.nss, &to = entry.renameTo]( - CollectionCatalog& catalog) { + writeJobs.push_back([opCtx, + &from = entry.nss, + &to = entry.renameTo, + commitTime](CollectionCatalog& catalog) { + // We just need to do modifications on 'from' here. 'to' is taken care + // of by a separate kWritableCollection entry. catalog._collections.erase(from); auto& resourceCatalog = ResourceCatalog::get(opCtx->getServiceContext()); resourceCatalog.remove({RESOURCE_COLLECTION, from}, from); resourceCatalog.add({RESOURCE_COLLECTION, to}, to); + + catalog._pushCatalogIdForRename(from, to, commitTime); }); break; } case UncommittedCatalogUpdates::Entry::Action::kDroppedCollection: { - writeJobs.push_back( - [opCtx, uuid = *entry.uuid(), isDropPending = *entry.isDropPending]( - CollectionCatalog& catalog) { - catalog.deregisterCollection(opCtx, uuid, isDropPending); - }); + writeJobs.push_back([opCtx, + uuid = *entry.uuid(), + isDropPending = *entry.isDropPending, + commitTime](CollectionCatalog& catalog) { + catalog.deregisterCollection(opCtx, uuid, isDropPending, commitTime); + }); break; } case UncommittedCatalogUpdates::Entry::Action::kRecreatedCollection: { @@ -177,17 +184,12 @@ public: collection = entry.collection, uuid = *entry.externalUUID, commitTime](CollectionCatalog& catalog) { - if (commitTime) { - collection->setMinimumValidSnapshot(commitTime.value()); - } - catalog.registerCollection(opCtx, uuid, std::move(collection)); + catalog.registerCollection(opCtx, uuid, std::move(collection), commitTime); }); // Fallthrough to the createCollection case to finish committing the collection. [[fallthrough]]; } case UncommittedCatalogUpdates::Entry::Action::kCreatedCollection: { - auto collPtr = entry.collection.get(); - // By this point, we may or may not have reserved an oplog slot for the // collection creation. // For example, multi-document transactions will only reserve the oplog slot at @@ -197,11 +199,15 @@ public: // we must update the minVisibleTimestamp with the appropriate value. This is // fine because the collection should not be visible in the catalog until we // call setCommitted(true). - if (commitTime) { - collPtr->setMinimumVisibleSnapshot(commitTime.value()); - collPtr->setMinimumValidSnapshot(commitTime.value()); - } - collPtr->setCommitted(true); + writeJobs.push_back([coll = entry.collection.get(), + commitTime](CollectionCatalog& catalog) { + if (commitTime) { + coll->setMinimumVisibleSnapshot(commitTime.value()); + coll->setMinimumValidSnapshot(commitTime.value()); + } + catalog._pushCatalogIdForNSS(coll->ns(), coll->getCatalogId(), commitTime); + coll->setCommitted(true); + }); break; } case UncommittedCatalogUpdates::Entry::Action::kReplacedViewsForDatabase: { @@ -1122,6 +1128,25 @@ boost::optional<UUID> CollectionCatalog::lookupUUIDByNSS(OperationContext* opCtx return boost::none; } +boost::optional<RecordId> CollectionCatalog::lookupCatalogIdByNSS( + const NamespaceString& nss, boost::optional<Timestamp> ts) const { + if (auto it = _catalogIds.find(nss); it != _catalogIds.end()) { + const auto& range = it->second; + if (!ts) { + return range.back().id; + } + + auto rangeIt = std::upper_bound( + range.begin(), range.end(), *ts, [](const auto& ts, const auto& entry) { + return ts < entry.ts; + }); + if (rangeIt != range.begin()) { + return (--rangeIt)->id; + } + } + return boost::none; +} + void CollectionCatalog::iterateViews(OperationContext* opCtx, const DatabaseName& dbName, ViewIteratorCallback callback, @@ -1305,7 +1330,8 @@ CollectionCatalog::ViewCatalogSet CollectionCatalog::getViewCatalogDbNames( void CollectionCatalog::registerCollection(OperationContext* opCtx, const UUID& uuid, - std::shared_ptr<Collection> coll) { + std::shared_ptr<Collection> coll, + boost::optional<Timestamp> commitTime) { auto nss = coll->ns(); _ensureNamespaceDoesNotExist(opCtx, nss, NamespaceType::kAll); @@ -1326,6 +1352,12 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx, _collections[nss] = coll; _orderedCollections[dbIdPair] = coll; + if (commitTime && !commitTime->isNull()) { + coll->setMinimumValidSnapshot(commitTime.value()); + _pushCatalogIdForNSS(nss, coll->getCatalogId(), commitTime); + } + + if (!nss.isOnInternalDb() && !nss.isSystem()) { _stats.userCollections += 1; if (coll->isCapped()) { @@ -1345,9 +1377,11 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx, resourceCatalog.add({RESOURCE_COLLECTION, nss}, nss); } -std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationContext* opCtx, - const UUID& uuid, - bool isDropPending) { +std::shared_ptr<Collection> CollectionCatalog::deregisterCollection( + OperationContext* opCtx, + const UUID& uuid, + bool isDropPending, + boost::optional<Timestamp> commitTime) { invariant(_catalog.find(uuid) != _catalog.end()); auto coll = std::move(_catalog[uuid]); @@ -1374,6 +1408,11 @@ std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationCon _collections.erase(ns); _catalog.erase(uuid); + // Push drop unless this is a rollback of a create + if (coll->isCommitted()) { + _pushCatalogIdForNSS(ns, boost::none, commitTime); + } + if (!ns.isOnInternalDb() && !ns.isSystem()) { _stats.userCollections -= 1; if (coll->isCapped()) { @@ -1446,6 +1485,110 @@ void CollectionCatalog::_ensureNamespaceDoesNotExist(OperationContext* opCtx, } } +void CollectionCatalog::_pushCatalogIdForNSS(const NamespaceString& nss, + boost::optional<RecordId> catalogId, + boost::optional<Timestamp> ts) { + // TODO SERVER-68674: Remove feature flag check. + if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) { + // No-op. + return; + } + + auto& ids = _catalogIds[nss]; + + if (!ts) { + // Make sure untimestamped writes have a single entry in mapping. If we're mixing + // timestamped with untimestamped (such as repair). Ignore the untimestamped writes as an + // untimestamped deregister will correspond with an untimestamped register. We should leave + // the mapping as-is in this case. + if (ids.empty() && catalogId) { + // This namespace was added due to an untimestamped write, add an entry with min + // timestamp + ids.push_back(TimestampedCatalogId{catalogId, Timestamp::min()}); + } else if (ids.size() == 1 && !catalogId) { + // This namespace was removed due to an untimestamped write, clear entries. + ids.clear(); + } + return; + } + + // Re-write latest entry if timestamp match (multiple changes occured in this transaction) + if (!ids.empty() && ids.back().ts == *ts) { + ids.back().id = catalogId; + return; + } + + // Otherwise, push new entry at the end. Timestamp is always increasing + invariant(ids.empty() || ids.back().ts < *ts); + // If the catalogId is the same as last entry, there's nothing we need to do. This can happen + // when the catalog is reopened. + if (!ids.empty() && ids.back().id == catalogId) { + return; + } + + ids.push_back(TimestampedCatalogId{catalogId, *ts}); + _markNamespaceForCatalogIdCleanupIfNeeded(nss, ids); +} + +void CollectionCatalog::_pushCatalogIdForRename(const NamespaceString& from, + const NamespaceString& to, + boost::optional<Timestamp> ts) { + // TODO SERVER-68674: Remove feature flag check. + if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) { + // No-op. + return; + } + + if (!ts) + return; + + // Get 'toIds' first, it may need to instantiate in the container which invalidates all + // references. + auto& toIds = _catalogIds[to]; + auto& fromIds = _catalogIds.at(from); + invariant(!fromIds.empty()); + + // Re-write latest entry if timestamp match (multiple changes occured in this transaction), + // otherwise push at end + if (!toIds.empty() && toIds.back().ts == *ts) { + toIds.back().id = fromIds.back().id; + } else { + invariant(toIds.empty() || toIds.back().ts < *ts); + toIds.push_back(TimestampedCatalogId{fromIds.back().id, *ts}); + _markNamespaceForCatalogIdCleanupIfNeeded(to, toIds); + } + + // Re-write latest entry if timestamp match (multiple changes occured in this transaction), + // otherwise push at end + if (!fromIds.empty() && fromIds.back().ts == *ts) { + fromIds.back().id = boost::none; + } else { + invariant(fromIds.empty() || fromIds.back().ts < *ts); + fromIds.push_back(TimestampedCatalogId{boost::none, *ts}); + _markNamespaceForCatalogIdCleanupIfNeeded(from, fromIds); + } +} + +void CollectionCatalog::_markNamespaceForCatalogIdCleanupIfNeeded( + const NamespaceString& nss, const std::vector<TimestampedCatalogId>& ids) { + + auto markForCleanup = [this, &nss](Timestamp ts) { + _catalogIdChanges.insert(nss); + if (ts < _lowestCatalogIdTimestampForCleanup) { + _lowestCatalogIdTimestampForCleanup = ts; + } + }; + + // Cleanup may occur if we have more than one entry for the namespace or if the only entry is a + // drop. Use the first entry as lowest cleanup time if we have a drop and the second otherwise + // (as the first is needed until that time is reached) + if (!ids.empty() && ids.front().id == boost::none) { + markForCleanup(ids.front().ts); + } else if (ids.size() > 1) { + markForCleanup(ids.at(1).ts); + } +} + void CollectionCatalog::deregisterAllCollectionsAndViews(ServiceContext* svcCtx) { LOGV2(20282, "Deregistering all the collections"); for (auto& entry : _catalog) { @@ -1538,6 +1681,99 @@ CollectionCatalog::iterator CollectionCatalog::end(OperationContext* opCtx) cons return iterator(opCtx, _orderedCollections.end(), *this); } +bool CollectionCatalog::needsCleanupForOldestTimestamp(Timestamp oldest) const { + // TODO SERVER-68674: Remove feature flag check. + if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) { + // No-op. + return false; + } + + return _lowestCatalogIdTimestampForCleanup <= oldest; +} + +void CollectionCatalog::cleanupForOldestTimestampAdvanced(Timestamp oldest) { + Timestamp nextLowestCleanupTimestamp = Timestamp::max(); + // Helper to calculate the smallest entry that needs to be kept and its timestamp + auto assignLowestCleanupTimestamp = [&nextLowestCleanupTimestamp](const auto& range) { + auto it = range.begin(); + // Drops can be cleaned up right away, otherwise the second entry is cleanup time. + if (it->id.has_value()) { + ++it; + } + nextLowestCleanupTimestamp = std::min(nextLowestCleanupTimestamp, it->ts); + }; + + // Iterate over all namespaces that is marked that they need cleanup + for (auto it = _catalogIdChanges.begin(), end = _catalogIdChanges.end(); it != end;) { + auto& range = _catalogIds[*it]; + + // Binary search for next larger timestamp + auto rangeIt = std::upper_bound( + range.begin(), range.end(), oldest, [](const auto& ts, const auto& entry) { + return ts < entry.ts; + }); + + // Continue if there is nothing to cleanup for this timestamp yet + if (rangeIt == range.begin()) { + assignLowestCleanupTimestamp(range); + ++it; + continue; + } + + // The iterator is positioned to the closest entry that has a larger timestamp, decrement to + // get a lower or equal timestamp + --rangeIt; + + // If we are positioned on a drop it can be removed + if (!rangeIt->id.has_value()) { + ++rangeIt; + } + + // Erase range + range.erase(range.begin(), rangeIt); + + // If the range is now empty or we need to keep the last item we can unmark this namespace + // for needing changes. + if (range.size() <= 1) { + _catalogIdChanges.erase(it++); + continue; + } + + // More changes are needed for this namespace, keep it in the set and keep track of lowest + // timestamp. + assignLowestCleanupTimestamp(range); + ++it; + } + + _lowestCatalogIdTimestampForCleanup = nextLowestCleanupTimestamp; +} + +void CollectionCatalog::cleanupForCatalogReopen(Timestamp stable) { + _catalogIdChanges.clear(); + _lowestCatalogIdTimestampForCleanup = Timestamp::max(); + + for (auto it = _catalogIds.begin(); it != _catalogIds.end();) { + auto& ids = it->second; + + // Remove all larger timestamps in this range + ids.erase(std::upper_bound(ids.begin(), + ids.end(), + stable, + [](Timestamp ts, const auto& entry) { return ts < entry.ts; }), + ids.end()); + + // Remove namespace if there are no entries left + if (ids.empty()) { + _catalogIds.erase(it++); + continue; + } + + // Calculate when this namespace needs to be cleaned up next + _markNamespaceForCatalogIdCleanupIfNeeded(it->first, ids); + ++it; + } +} + void CollectionCatalog::invariantHasExclusiveAccessToCollection(OperationContext* opCtx, const NamespaceString& nss) { auto& uncommittedCatalogUpdates = UncommittedCatalogUpdates::get(opCtx); |