diff options
-rw-r--r-- | src/mongo/db/catalog/catalog_control.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_catalog.cpp | 280 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_catalog.h | 74 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_catalog_bm.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_catalog_test.cpp | 426 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_writer_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/catalog/uncommitted_catalog_updates.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/durable_catalog_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/storage/kv/storage_engine_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine.h | 4 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_impl.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_impl.h | 7 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/storage/storage_engine_test_fixture.h | 3 |
16 files changed, 834 insertions, 83 deletions
diff --git a/src/mongo/db/catalog/catalog_control.cpp b/src/mongo/db/catalog/catalog_control.cpp index 7f4dc6167e8..95774bbe155 100644 --- a/src/mongo/db/catalog/catalog_control.cpp +++ b/src/mongo/db/catalog/catalog_control.cpp @@ -243,9 +243,15 @@ void openCatalog(OperationContext* opCtx, // Load the catalog in the storage engine. LOGV2(20273, "openCatalog: loading storage engine catalog"); auto storageEngine = opCtx->getServiceContext()->getStorageEngine(); + + // Remove catalogId mappings for larger timestamp than 'stableTimestamp'. + CollectionCatalog::write(opCtx, [stableTimestamp](CollectionCatalog& catalog) { + catalog.cleanupForCatalogReopen(stableTimestamp); + }); + // Ignore orphaned idents because this function is used during rollback and not at // startup recovery, when we may try to recover orphaned idents. - storageEngine->loadCatalog(opCtx, StorageEngine::LastShutdownState::kClean); + storageEngine->loadCatalog(opCtx, stableTimestamp, StorageEngine::LastShutdownState::kClean); LOGV2(20274, "openCatalog: reconciling catalog and idents"); auto reconcileResult = fassert( diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp index 26c630537a8..3a7e255f464 100644 --- a/src/mongo/db/catalog/collection_catalog.cpp +++ b/src/mongo/db/catalog/collection_catalog.cpp @@ -154,22 +154,29 @@ public: break; } case UncommittedCatalogUpdates::Entry::Action::kRenamedCollection: { - writeJobs.push_back([opCtx, &from = entry.nss, &to = entry.renameTo]( - CollectionCatalog& catalog) { + writeJobs.push_back([opCtx, + &from = entry.nss, + &to = entry.renameTo, + commitTime](CollectionCatalog& catalog) { + // We just need to do modifications on 'from' here. 'to' is taken care + // of by a separate kWritableCollection entry. catalog._collections.erase(from); auto& resourceCatalog = ResourceCatalog::get(opCtx->getServiceContext()); resourceCatalog.remove({RESOURCE_COLLECTION, from}, from); resourceCatalog.add({RESOURCE_COLLECTION, to}, to); + + catalog._pushCatalogIdForRename(from, to, commitTime); }); break; } case UncommittedCatalogUpdates::Entry::Action::kDroppedCollection: { - writeJobs.push_back( - [opCtx, uuid = *entry.uuid(), isDropPending = *entry.isDropPending]( - CollectionCatalog& catalog) { - catalog.deregisterCollection(opCtx, uuid, isDropPending); - }); + writeJobs.push_back([opCtx, + uuid = *entry.uuid(), + isDropPending = *entry.isDropPending, + commitTime](CollectionCatalog& catalog) { + catalog.deregisterCollection(opCtx, uuid, isDropPending, commitTime); + }); break; } case UncommittedCatalogUpdates::Entry::Action::kRecreatedCollection: { @@ -177,17 +184,12 @@ public: collection = entry.collection, uuid = *entry.externalUUID, commitTime](CollectionCatalog& catalog) { - if (commitTime) { - collection->setMinimumValidSnapshot(commitTime.value()); - } - catalog.registerCollection(opCtx, uuid, std::move(collection)); + catalog.registerCollection(opCtx, uuid, std::move(collection), commitTime); }); // Fallthrough to the createCollection case to finish committing the collection. [[fallthrough]]; } case UncommittedCatalogUpdates::Entry::Action::kCreatedCollection: { - auto collPtr = entry.collection.get(); - // By this point, we may or may not have reserved an oplog slot for the // collection creation. // For example, multi-document transactions will only reserve the oplog slot at @@ -197,11 +199,15 @@ public: // we must update the minVisibleTimestamp with the appropriate value. This is // fine because the collection should not be visible in the catalog until we // call setCommitted(true). - if (commitTime) { - collPtr->setMinimumVisibleSnapshot(commitTime.value()); - collPtr->setMinimumValidSnapshot(commitTime.value()); - } - collPtr->setCommitted(true); + writeJobs.push_back([coll = entry.collection.get(), + commitTime](CollectionCatalog& catalog) { + if (commitTime) { + coll->setMinimumVisibleSnapshot(commitTime.value()); + coll->setMinimumValidSnapshot(commitTime.value()); + } + catalog._pushCatalogIdForNSS(coll->ns(), coll->getCatalogId(), commitTime); + coll->setCommitted(true); + }); break; } case UncommittedCatalogUpdates::Entry::Action::kReplacedViewsForDatabase: { @@ -1122,6 +1128,25 @@ boost::optional<UUID> CollectionCatalog::lookupUUIDByNSS(OperationContext* opCtx return boost::none; } +boost::optional<RecordId> CollectionCatalog::lookupCatalogIdByNSS( + const NamespaceString& nss, boost::optional<Timestamp> ts) const { + if (auto it = _catalogIds.find(nss); it != _catalogIds.end()) { + const auto& range = it->second; + if (!ts) { + return range.back().id; + } + + auto rangeIt = std::upper_bound( + range.begin(), range.end(), *ts, [](const auto& ts, const auto& entry) { + return ts < entry.ts; + }); + if (rangeIt != range.begin()) { + return (--rangeIt)->id; + } + } + return boost::none; +} + void CollectionCatalog::iterateViews(OperationContext* opCtx, const DatabaseName& dbName, ViewIteratorCallback callback, @@ -1305,7 +1330,8 @@ CollectionCatalog::ViewCatalogSet CollectionCatalog::getViewCatalogDbNames( void CollectionCatalog::registerCollection(OperationContext* opCtx, const UUID& uuid, - std::shared_ptr<Collection> coll) { + std::shared_ptr<Collection> coll, + boost::optional<Timestamp> commitTime) { auto nss = coll->ns(); _ensureNamespaceDoesNotExist(opCtx, nss, NamespaceType::kAll); @@ -1326,6 +1352,12 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx, _collections[nss] = coll; _orderedCollections[dbIdPair] = coll; + if (commitTime && !commitTime->isNull()) { + coll->setMinimumValidSnapshot(commitTime.value()); + _pushCatalogIdForNSS(nss, coll->getCatalogId(), commitTime); + } + + if (!nss.isOnInternalDb() && !nss.isSystem()) { _stats.userCollections += 1; if (coll->isCapped()) { @@ -1345,9 +1377,11 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx, resourceCatalog.add({RESOURCE_COLLECTION, nss}, nss); } -std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationContext* opCtx, - const UUID& uuid, - bool isDropPending) { +std::shared_ptr<Collection> CollectionCatalog::deregisterCollection( + OperationContext* opCtx, + const UUID& uuid, + bool isDropPending, + boost::optional<Timestamp> commitTime) { invariant(_catalog.find(uuid) != _catalog.end()); auto coll = std::move(_catalog[uuid]); @@ -1374,6 +1408,11 @@ std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationCon _collections.erase(ns); _catalog.erase(uuid); + // Push drop unless this is a rollback of a create + if (coll->isCommitted()) { + _pushCatalogIdForNSS(ns, boost::none, commitTime); + } + if (!ns.isOnInternalDb() && !ns.isSystem()) { _stats.userCollections -= 1; if (coll->isCapped()) { @@ -1446,6 +1485,110 @@ void CollectionCatalog::_ensureNamespaceDoesNotExist(OperationContext* opCtx, } } +void CollectionCatalog::_pushCatalogIdForNSS(const NamespaceString& nss, + boost::optional<RecordId> catalogId, + boost::optional<Timestamp> ts) { + // TODO SERVER-68674: Remove feature flag check. + if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) { + // No-op. + return; + } + + auto& ids = _catalogIds[nss]; + + if (!ts) { + // Make sure untimestamped writes have a single entry in mapping. If we're mixing + // timestamped with untimestamped (such as repair). Ignore the untimestamped writes as an + // untimestamped deregister will correspond with an untimestamped register. We should leave + // the mapping as-is in this case. + if (ids.empty() && catalogId) { + // This namespace was added due to an untimestamped write, add an entry with min + // timestamp + ids.push_back(TimestampedCatalogId{catalogId, Timestamp::min()}); + } else if (ids.size() == 1 && !catalogId) { + // This namespace was removed due to an untimestamped write, clear entries. + ids.clear(); + } + return; + } + + // Re-write latest entry if timestamp match (multiple changes occured in this transaction) + if (!ids.empty() && ids.back().ts == *ts) { + ids.back().id = catalogId; + return; + } + + // Otherwise, push new entry at the end. Timestamp is always increasing + invariant(ids.empty() || ids.back().ts < *ts); + // If the catalogId is the same as last entry, there's nothing we need to do. This can happen + // when the catalog is reopened. + if (!ids.empty() && ids.back().id == catalogId) { + return; + } + + ids.push_back(TimestampedCatalogId{catalogId, *ts}); + _markNamespaceForCatalogIdCleanupIfNeeded(nss, ids); +} + +void CollectionCatalog::_pushCatalogIdForRename(const NamespaceString& from, + const NamespaceString& to, + boost::optional<Timestamp> ts) { + // TODO SERVER-68674: Remove feature flag check. + if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) { + // No-op. + return; + } + + if (!ts) + return; + + // Get 'toIds' first, it may need to instantiate in the container which invalidates all + // references. + auto& toIds = _catalogIds[to]; + auto& fromIds = _catalogIds.at(from); + invariant(!fromIds.empty()); + + // Re-write latest entry if timestamp match (multiple changes occured in this transaction), + // otherwise push at end + if (!toIds.empty() && toIds.back().ts == *ts) { + toIds.back().id = fromIds.back().id; + } else { + invariant(toIds.empty() || toIds.back().ts < *ts); + toIds.push_back(TimestampedCatalogId{fromIds.back().id, *ts}); + _markNamespaceForCatalogIdCleanupIfNeeded(to, toIds); + } + + // Re-write latest entry if timestamp match (multiple changes occured in this transaction), + // otherwise push at end + if (!fromIds.empty() && fromIds.back().ts == *ts) { + fromIds.back().id = boost::none; + } else { + invariant(fromIds.empty() || fromIds.back().ts < *ts); + fromIds.push_back(TimestampedCatalogId{boost::none, *ts}); + _markNamespaceForCatalogIdCleanupIfNeeded(from, fromIds); + } +} + +void CollectionCatalog::_markNamespaceForCatalogIdCleanupIfNeeded( + const NamespaceString& nss, const std::vector<TimestampedCatalogId>& ids) { + + auto markForCleanup = [this, &nss](Timestamp ts) { + _catalogIdChanges.insert(nss); + if (ts < _lowestCatalogIdTimestampForCleanup) { + _lowestCatalogIdTimestampForCleanup = ts; + } + }; + + // Cleanup may occur if we have more than one entry for the namespace or if the only entry is a + // drop. Use the first entry as lowest cleanup time if we have a drop and the second otherwise + // (as the first is needed until that time is reached) + if (!ids.empty() && ids.front().id == boost::none) { + markForCleanup(ids.front().ts); + } else if (ids.size() > 1) { + markForCleanup(ids.at(1).ts); + } +} + void CollectionCatalog::deregisterAllCollectionsAndViews(ServiceContext* svcCtx) { LOGV2(20282, "Deregistering all the collections"); for (auto& entry : _catalog) { @@ -1538,6 +1681,99 @@ CollectionCatalog::iterator CollectionCatalog::end(OperationContext* opCtx) cons return iterator(opCtx, _orderedCollections.end(), *this); } +bool CollectionCatalog::needsCleanupForOldestTimestamp(Timestamp oldest) const { + // TODO SERVER-68674: Remove feature flag check. + if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) { + // No-op. + return false; + } + + return _lowestCatalogIdTimestampForCleanup <= oldest; +} + +void CollectionCatalog::cleanupForOldestTimestampAdvanced(Timestamp oldest) { + Timestamp nextLowestCleanupTimestamp = Timestamp::max(); + // Helper to calculate the smallest entry that needs to be kept and its timestamp + auto assignLowestCleanupTimestamp = [&nextLowestCleanupTimestamp](const auto& range) { + auto it = range.begin(); + // Drops can be cleaned up right away, otherwise the second entry is cleanup time. + if (it->id.has_value()) { + ++it; + } + nextLowestCleanupTimestamp = std::min(nextLowestCleanupTimestamp, it->ts); + }; + + // Iterate over all namespaces that is marked that they need cleanup + for (auto it = _catalogIdChanges.begin(), end = _catalogIdChanges.end(); it != end;) { + auto& range = _catalogIds[*it]; + + // Binary search for next larger timestamp + auto rangeIt = std::upper_bound( + range.begin(), range.end(), oldest, [](const auto& ts, const auto& entry) { + return ts < entry.ts; + }); + + // Continue if there is nothing to cleanup for this timestamp yet + if (rangeIt == range.begin()) { + assignLowestCleanupTimestamp(range); + ++it; + continue; + } + + // The iterator is positioned to the closest entry that has a larger timestamp, decrement to + // get a lower or equal timestamp + --rangeIt; + + // If we are positioned on a drop it can be removed + if (!rangeIt->id.has_value()) { + ++rangeIt; + } + + // Erase range + range.erase(range.begin(), rangeIt); + + // If the range is now empty or we need to keep the last item we can unmark this namespace + // for needing changes. + if (range.size() <= 1) { + _catalogIdChanges.erase(it++); + continue; + } + + // More changes are needed for this namespace, keep it in the set and keep track of lowest + // timestamp. + assignLowestCleanupTimestamp(range); + ++it; + } + + _lowestCatalogIdTimestampForCleanup = nextLowestCleanupTimestamp; +} + +void CollectionCatalog::cleanupForCatalogReopen(Timestamp stable) { + _catalogIdChanges.clear(); + _lowestCatalogIdTimestampForCleanup = Timestamp::max(); + + for (auto it = _catalogIds.begin(); it != _catalogIds.end();) { + auto& ids = it->second; + + // Remove all larger timestamps in this range + ids.erase(std::upper_bound(ids.begin(), + ids.end(), + stable, + [](Timestamp ts, const auto& entry) { return ts < entry.ts; }), + ids.end()); + + // Remove namespace if there are no entries left + if (ids.empty()) { + _catalogIds.erase(it++); + continue; + } + + // Calculate when this namespace needs to be cleaned up next + _markNamespaceForCatalogIdCleanupIfNeeded(it->first, ids); + ++it; + } +} + void CollectionCatalog::invariantHasExclusiveAccessToCollection(OperationContext* opCtx, const NamespaceString& nss) { auto& uncommittedCatalogUpdates = UncommittedCatalogUpdates::get(opCtx); diff --git a/src/mongo/db/catalog/collection_catalog.h b/src/mongo/db/catalog/collection_catalog.h index 80d73fb11fb..a3bd18e51df 100644 --- a/src/mongo/db/catalog/collection_catalog.h +++ b/src/mongo/db/catalog/collection_catalog.h @@ -38,7 +38,7 @@ #include "mongo/db/database_name.h" #include "mongo/db/profile_filter.h" #include "mongo/db/service_context.h" -// TODO SERVER-68265: remove include. +// TODO SERVER-69372: remove include. #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/views/view.h" #include "mongo/stdx/unordered_map.h" @@ -210,7 +210,7 @@ public: /** * Returns the collection instance representative of 'entry' at the provided read timestamp. * - * TODO SERVER-68265: + * TODO SERVER-69372: * - Use NamespaceString instead of DurableCatalog::Entry. * - Remove DurableCatalog dependency. * @@ -288,7 +288,8 @@ public: */ void registerCollection(OperationContext* opCtx, const UUID& uuid, - std::shared_ptr<Collection> collection); + std::shared_ptr<Collection> collection, + boost::optional<Timestamp> commitTime); /** * Deregister the collection. @@ -297,7 +298,8 @@ public: */ std::shared_ptr<Collection> deregisterCollection(OperationContext* opCtx, const UUID& uuid, - bool isDropPending); + bool isDropPending, + boost::optional<Timestamp> commitTime); /** * Create a temporary record of an uncommitted view namespace to aid in detecting a simultaneous @@ -401,6 +403,17 @@ public: const NamespaceString& nss) const; /** + * Returns the CatalogId for a given 'nss' at timestamp 'ts'. + * + * Timestamp must be in the range [oldest_timestamp, now) + * If 'ts' is boost::none the latest CatalogId is returned. + * + * Returns boost::none if no namespace exist at the timestamp or if 'ts' is out of range. + */ + boost::optional<RecordId> lookupCatalogIdByNSS( + const NamespaceString& nss, boost::optional<Timestamp> ts = boost::none) const; + + /** * Iterates through the views in the catalog associated with database `dbName`, applying * 'callback' to each view. If the 'callback' returns false, the iterator exits early. * @@ -566,6 +579,23 @@ public: iterator end(OperationContext* opCtx) const; /** + * Checks if 'cleanupForOldestTimestampAdvanced' should be called when the oldest timestamp + * advanced. Used to avoid a potentially expensive call to 'cleanupForOldestTimestampAdvanced' + * if no write is needed. + */ + bool needsCleanupForOldestTimestamp(Timestamp oldest) const; + + /** + * Cleans up internal structures when the oldest timestamp advances + */ + void cleanupForOldestTimestampAdvanced(Timestamp oldest); + + /** + * Cleans up internal structures after catalog reopen + */ + void cleanupForCatalogReopen(Timestamp stable); + + /** * Ensures we have a MODE_X lock on a collection or MODE_IX lock for newly created collections. */ static void invariantHasExclusiveAccessToCollection(OperationContext* opCtx, @@ -628,6 +658,33 @@ private: NamespaceType type) const; /** + * CatalogId with Timestamp + */ + struct TimestampedCatalogId { + boost::optional<RecordId> id; + Timestamp ts; + }; + + // Push a catalogId for namespace at given Timestamp. Timestamp needs to be larger than other + // entries for this namespace. boost::none for catalogId represent drop, boost::none for + // timestamp turns this operation into a no-op. + void _pushCatalogIdForNSS(const NamespaceString& nss, + boost::optional<RecordId> catalogId, + boost::optional<Timestamp> ts); + + // Push a catalogId for 'from' and 'to' for a rename operation at given Timestamp. Timestamp + // needs to be larger than other entries for these namespaces. boost::none for timestamp turns + // this operation into a no-op. + void _pushCatalogIdForRename(const NamespaceString& from, + const NamespaceString& to, + boost::optional<Timestamp> ts); + + // Helper to calculate if a namespace needs to be marked for cleanup for a set of timestamped + // catalogIds + void _markNamespaceForCatalogIdCleanupIfNeeded(const NamespaceString& nss, + const std::vector<TimestampedCatalogId>& ids); + + /** * When present, indicates that the catalog is in closed state, and contains a map from UUID * to pre-close NSS. See also onCloseCatalog. */ @@ -647,6 +704,15 @@ private: NamespaceCollectionMap _collections; UncommittedViewsSet _uncommittedViews; + // CatalogId mappings for all known namespaces for the CollectionCatalog. The vector is sorted + // on timestamp. + absl::flat_hash_map<NamespaceString, std::vector<TimestampedCatalogId>> _catalogIds; + // Set of namespaces that need cleanup when the oldest timestamp advances sufficiently. + absl::flat_hash_set<NamespaceString> _catalogIdChanges; + // Point at which the oldest timestamp need to advance for there to be any catalogId namespace + // that can be cleaned up + Timestamp _lowestCatalogIdTimestampForCleanup = Timestamp::max(); + // Map of database names to their corresponding views and other associated state. ViewsForDatabaseMap _viewsForDatabase; diff --git a/src/mongo/db/catalog/collection_catalog_bm.cpp b/src/mongo/db/catalog/collection_catalog_bm.cpp index 3ff0e3375bd..d04f2115f55 100644 --- a/src/mongo/db/catalog/collection_catalog_bm.cpp +++ b/src/mongo/db/catalog/collection_catalog_bm.cpp @@ -78,7 +78,10 @@ void createCollections(OperationContext* opCtx, int numCollections) { for (auto i = 0; i < numCollections; i++) { const NamespaceString nss("collection_catalog_bm", std::to_string(i)); CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, UUID::gen(), std::make_shared<CollectionMock>(nss)); + catalog.registerCollection(opCtx, + UUID::gen(), + std::make_shared<CollectionMock>(nss), + /*ts=*/boost::none); }); } } diff --git a/src/mongo/db/catalog/collection_catalog_test.cpp b/src/mongo/db/catalog/collection_catalog_test.cpp index bba37ae83b6..59f31db4ed7 100644 --- a/src/mongo/db/catalog/collection_catalog_test.cpp +++ b/src/mongo/db/catalog/collection_catalog_test.cpp @@ -74,7 +74,7 @@ public: std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(colUUID, nss); col = CollectionPtr(collection.get(), CollectionPtr::NoYieldTag{}); // Register dummy collection in catalog. - catalog.registerCollection(opCtx.get(), colUUID, collection); + catalog.registerCollection(opCtx.get(), colUUID, collection, boost::none); // Validate that kNumCollectionReferencesStored is correct, add one reference for the one we // hold in this function. @@ -112,15 +112,16 @@ public: dbMap["foo"].insert(std::make_pair(fooUuid, fooColl.get())); dbMap["bar"].insert(std::make_pair(barUuid, barColl.get())); - catalog.registerCollection(opCtx.get(), fooUuid, fooColl); - catalog.registerCollection(opCtx.get(), barUuid, barColl); + catalog.registerCollection(opCtx.get(), fooUuid, fooColl, boost::none); + catalog.registerCollection(opCtx.get(), barUuid, barColl, boost::none); } } void tearDown() { for (auto& it : dbMap) { for (auto& kv : it.second) { - catalog.deregisterCollection(opCtx.get(), kv.first, /*isDropPending=*/false); + catalog.deregisterCollection( + opCtx.get(), kv.first, /*isDropPending=*/false, boost::none); } } } @@ -177,7 +178,7 @@ public: std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); auto uuid = collection->uuid(); - catalog.registerCollection(opCtx.get(), uuid, std::move(collection)); + catalog.registerCollection(opCtx.get(), uuid, std::move(collection), boost::none); } int numEntries = 0; @@ -209,7 +210,7 @@ public: } for (auto&& uuid : collectionsToDeregister) { - catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false); + catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false, boost::none); } int numEntries = 0; @@ -273,7 +274,7 @@ TEST_F(CollectionCatalogResourceTest, LookupMissingCollectionResource) { TEST_F(CollectionCatalogResourceTest, RemoveCollection) { const NamespaceString collNs = NamespaceString(boost::none, "resourceDb.coll1"); auto coll = catalog.lookupCollectionByNamespace(opCtx.get(), NamespaceString(collNs)); - catalog.deregisterCollection(opCtx.get(), coll->uuid(), /*isDropPending=*/false); + catalog.deregisterCollection(opCtx.get(), coll->uuid(), /*isDropPending=*/false, boost::none); auto rid = ResourceId(RESOURCE_COLLECTION, collNs); ASSERT(!ResourceCatalog::get(getServiceContext()).name(rid)); } @@ -295,7 +296,7 @@ TEST_F(CollectionCatalogIterationTest, GetUUIDWontRepositionEvenIfEntryIsDropped auto it = catalog.begin(opCtx.get(), DatabaseName(boost::none, "bar")); auto collsIt = collsIterator("bar"); auto uuid = collsIt->first; - catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false); + catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false, boost::none); dropColl("bar", uuid); ASSERT_EQUALS(uuid, it.uuid()); @@ -329,7 +330,7 @@ TEST_F(CollectionCatalogTest, InsertAfterLookup) { // Ensure that looking up non-existing UUIDs doesn't affect later registration of those UUIDs. ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), newUUID) == nullptr); ASSERT_EQUALS(catalog.lookupNSSByUUID(opCtx.get(), newUUID), boost::none); - catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared)); + catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared), boost::none); ASSERT_EQUALS(catalog.lookupCollectionByUUID(opCtx.get(), newUUID), newCol); ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss); } @@ -358,7 +359,7 @@ TEST_F(CollectionCatalogTest, OnDropCollection) { yieldableColl.yield(); ASSERT_FALSE(yieldableColl); - catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false); + catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false, boost::none); // Ensure the lookup returns a null pointer upon removing the colUUID entry. ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), colUUID) == nullptr); @@ -372,7 +373,7 @@ TEST_F(CollectionCatalogTest, RenameCollection) { NamespaceString oldNss(nss.db(), "oldcol"); std::shared_ptr<Collection> collShared = std::make_shared<CollectionMock>(uuid, oldNss); auto collection = collShared.get(); - catalog.registerCollection(opCtx.get(), uuid, std::move(collShared)); + catalog.registerCollection(opCtx.get(), uuid, std::move(collShared), boost::none); auto yieldableColl = catalog.lookupCollectionByUUID(opCtx.get(), uuid); ASSERT(yieldableColl); ASSERT_EQUALS(yieldableColl, collection); @@ -412,7 +413,7 @@ TEST_F(CollectionCatalogTest, LookupNSSByUUIDForClosedCatalogReturnsOldNSSIfDrop catalog.onCloseCatalog(); } - catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false); + catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false, boost::none); ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), colUUID) == nullptr); ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss); @@ -438,7 +439,7 @@ TEST_F(CollectionCatalogTest, LookupNSSByUUIDForClosedCatalogReturnsNewlyCreated ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), newUUID) == nullptr); ASSERT_EQUALS(catalog.lookupNSSByUUID(opCtx.get(), newUUID), boost::none); - catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared)); + catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared), boost::none); ASSERT_EQUALS(catalog.lookupCollectionByUUID(opCtx.get(), newUUID), newCol); ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss); @@ -462,10 +463,10 @@ TEST_F(CollectionCatalogTest, LookupNSSByUUIDForClosedCatalogReturnsFreshestNSS) catalog.onCloseCatalog(); } - catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false); + catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false, boost::none); ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), colUUID) == nullptr); ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss); - catalog.registerCollection(opCtx.get(), colUUID, std::move(newCollShared)); + catalog.registerCollection(opCtx.get(), colUUID, std::move(newCollShared), boost::none); ASSERT_EQUALS(catalog.lookupCollectionByUUID(opCtx.get(), colUUID), newCol); ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), newNss); @@ -506,7 +507,7 @@ TEST_F(CollectionCatalogTest, GetAllCollectionNamesAndGetAllDbNames) { for (auto& nss : nsss) { std::shared_ptr<Collection> newColl = std::make_shared<CollectionMock>(nss); auto uuid = UUID::gen(); - catalog.registerCollection(opCtx.get(), uuid, std::move(newColl)); + catalog.registerCollection(opCtx.get(), uuid, std::move(newColl), boost::none); } std::vector<NamespaceString> dCollList = {d1Coll, d2Coll, d3Coll}; @@ -561,7 +562,7 @@ TEST_F(CollectionCatalogTest, GetAllCollectionNamesAndGetAllDbNamesWithUncommitt for (auto& nss : nsss) { std::shared_ptr<Collection> newColl = std::make_shared<CollectionMock>(nss); auto uuid = UUID::gen(); - catalog.registerCollection(opCtx.get(), uuid, std::move(newColl)); + catalog.registerCollection(opCtx.get(), uuid, std::move(newColl), boost::none); } // One dbName with only an invisible collection does not appear in dbNames. Use const_cast to @@ -772,6 +773,10 @@ public: opCtx = makeOperationContext(); } + std::shared_ptr<const CollectionCatalog> catalog() { + return CollectionCatalog::get(opCtx.get()); + } + void createCollection(OperationContext* opCtx, const NamespaceString& nss, Timestamp timestamp) { @@ -832,6 +837,33 @@ public: wuow.commit(); } + void renameCollection(OperationContext* opCtx, + const NamespaceString& from, + const NamespaceString& to, + Timestamp timestamp) { + invariant(from.db() == to.db()); + + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + opCtx->recoveryUnit()->abandonSnapshot(); + + if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + opCtx->recoveryUnit()->clearCommitTimestamp(); + } + opCtx->recoveryUnit()->setCommitTimestamp(timestamp); + + Lock::DBLock dbLk(opCtx, from.db(), MODE_IX); + Lock::CollectionLock fromLk(opCtx, from, MODE_X); + Lock::CollectionLock toLk(opCtx, to, MODE_X); + + CollectionWriter collection(opCtx, from); + + WriteUnitOfWork wuow(opCtx); + ASSERT_OK(collection.getWritableCollection(opCtx)->rename(opCtx, to, false)); + CollectionCatalog::get(opCtx)->onCollectionRename( + opCtx, collection.getWritableCollection(opCtx), from); + wuow.commit(); + } + void createIndex(OperationContext* opCtx, const NamespaceString& nss, BSONObj indexSpec, @@ -1579,5 +1611,365 @@ TEST_F(CollectionCatalogTimestampTest, OpenNewCollectionAndIndexesWithReaper) { } } +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCreate) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create collection and extract the catalogId + createCollection(opCtx.get(), nss, Timestamp(1, 2)); + RecordId rid = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId(); + + // Lookup without timestamp returns latest catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, boost::none), rid); + // Lookup before create returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 1)), boost::none); + // Lookup at create returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 2)), rid); + // Lookup after create returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 3)), rid); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingDrop) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create and drop collection. We have a time window where the namespace exists + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + RecordId rid = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId(); + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + + // Lookup without timestamp returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, boost::none), boost::none); + // Lookup before create returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 4)), boost::none); + // Lookup at create returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), rid); + // Lookup after create returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 6)), rid); + // Lookup at drop returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 10)), boost::none); + // Lookup after drop returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 20)), boost::none); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingRename) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString from("a.b"); + NamespaceString to("a.c"); + + // Create and rename collection. We have two windows where the namespace exists but for + // different namespaces + createCollection(opCtx.get(), from, Timestamp(1, 5)); + RecordId rid = catalog()->lookupCollectionByNamespace(opCtx.get(), from)->getCatalogId(); + renameCollection(opCtx.get(), from, to, Timestamp(1, 10)); + + // Lookup without timestamp on 'from' returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, boost::none), boost::none); + // Lookup before create returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 4)), boost::none); + // Lookup at create returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 5)), rid); + // Lookup after create returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 6)), rid); + // Lookup at rename on 'from' returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 10)), boost::none); + // Lookup after rename on 'from' returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 20)), boost::none); + + // Lookup without timestamp on 'to' returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, boost::none), rid); + // Lookup before rename on 'to' returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, Timestamp(1, 9)), boost::none); + // Lookup at rename on 'to' returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, Timestamp(1, 10)), rid); + // Lookup after rename on 'to' returns catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, Timestamp(1, 20)), rid); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingDropCreate) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create, drop and recreate collection on the same namespace. We have different catalogId. + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + RecordId rid1 = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId(); + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + createCollection(opCtx.get(), nss, Timestamp(1, 15)); + RecordId rid2 = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId(); + + // Lookup without timestamp returns latest catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, boost::none), rid2); + // Lookup before first create returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 4)), boost::none); + // Lookup at first create returns first catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), rid1); + // Lookup after first create returns first catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 6)), rid1); + // Lookup at drop returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 10)), boost::none); + // Lookup after drop returns none + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 13)), boost::none); + // Lookup at second create returns second catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), rid2); + // Lookup after second create returns second catalogId + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 20)), rid2); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupEqDrop) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create collection and verify we have nothing to cleanup + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + + // Drop collection and verify we have nothing to cleanup as long as the oldest timestamp is + // before the drop + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5))); + ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10))); + + // Create new collection and nothing changed with answers to needsCleanupForOldestTimestamp. + createCollection(opCtx.get(), nss, Timestamp(1, 15)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 7))); + ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10))); + + // We can lookup the old catalogId before we advance the oldest timestamp and cleanup + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + + // Cleanup at drop timestamp + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 10)); + }); + // After cleanup, we cannot find the old catalogId anymore. Also verify that we don't need + // anymore cleanup + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10))); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupGtDrop) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create collection and verify we have nothing to cleanup + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + + // Drop collection and verify we have nothing to cleanup as long as the oldest timestamp is + // before the drop + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5))); + ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10))); + + // Create new collection and nothing changed with answers to needsCleanupForOldestTimestamp. + createCollection(opCtx.get(), nss, Timestamp(1, 15)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 7))); + ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 12))); + + // We can lookup the old catalogId before we advance the oldest timestamp and cleanup + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + + // Cleanup after the drop timestamp + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 12)); + }); + + // After cleanup, we cannot find the old catalogId anymore. Also verify that we don't need + // anymore cleanup + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 12))); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupGtRecreate) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create collection and verify we have nothing to cleanup + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + + // Drop collection and verify we have nothing to cleanup as long as the oldest timestamp is + // before the drop + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5))); + ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10))); + + // Create new collection and nothing changed with answers to needsCleanupForOldestTimestamp. + createCollection(opCtx.get(), nss, Timestamp(1, 15)); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5))); + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 7))); + ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 20))); + + // We can lookup the old catalogId before we advance the oldest timestamp and cleanup + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + + // Cleanup after the recreate timestamp + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 20)); + }); + + // After cleanup, we cannot find the old catalogId anymore. Also verify that we don't need + // anymore cleanup + ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 20))); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupMultiple) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create and drop multiple namespace on the same namespace + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + createCollection(opCtx.get(), nss, Timestamp(1, 15)); + dropCollection(opCtx.get(), nss, Timestamp(1, 20)); + createCollection(opCtx.get(), nss, Timestamp(1, 25)); + dropCollection(opCtx.get(), nss, Timestamp(1, 30)); + createCollection(opCtx.get(), nss, Timestamp(1, 35)); + dropCollection(opCtx.get(), nss, Timestamp(1, 40)); + + // Lookup can find all four collections + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); + + // Cleanup oldest + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 10)); + }); + + // Lookup can find the three remaining collections + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); + + // Cleanup + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 21)); + }); + + // Lookup can find the two remaining collections + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); + + // Cleanup + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 32)); + }); + + // Lookup can find the last remaining collections + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); + + // Cleanup + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 50)); + }); + + // Lookup can find no collections + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupMultipleSingleCall) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString nss("a.b"); + + // Create and drop multiple namespace on the same namespace + createCollection(opCtx.get(), nss, Timestamp(1, 5)); + dropCollection(opCtx.get(), nss, Timestamp(1, 10)); + createCollection(opCtx.get(), nss, Timestamp(1, 15)); + dropCollection(opCtx.get(), nss, Timestamp(1, 20)); + createCollection(opCtx.get(), nss, Timestamp(1, 25)); + dropCollection(opCtx.get(), nss, Timestamp(1, 30)); + createCollection(opCtx.get(), nss, Timestamp(1, 35)); + dropCollection(opCtx.get(), nss, Timestamp(1, 40)); + + // Lookup can find all four collections + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); + + // Cleanup all + CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) { + c.cleanupForOldestTimestampAdvanced(Timestamp(1, 50)); + }); + + // Lookup can find no collections + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none); +} + +TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingRollback) { + RAIIServerParameterControllerForTest featureFlagController( + "featureFlagPointInTimeCatalogLookups", true); + + NamespaceString a("b.a"); + NamespaceString b("b.b"); + NamespaceString c("b.c"); + NamespaceString d("b.d"); + NamespaceString e("b.e"); + + // Create and drop multiple namespace on the same namespace + createCollection(opCtx.get(), a, Timestamp(1, 1)); + dropCollection(opCtx.get(), a, Timestamp(1, 2)); + createCollection(opCtx.get(), a, Timestamp(1, 3)); + createCollection(opCtx.get(), b, Timestamp(1, 5)); + createCollection(opCtx.get(), c, Timestamp(1, 7)); + createCollection(opCtx.get(), d, Timestamp(1, 8)); + createCollection(opCtx.get(), e, Timestamp(1, 9)); + dropCollection(opCtx.get(), b, Timestamp(1, 10)); + + // Rollback to Timestamp(1, 8) + CollectionCatalog::write( + opCtx.get(), [&](CollectionCatalog& c) { c.cleanupForCatalogReopen(Timestamp(1, 8)); }); + + ASSERT_NE(catalog()->lookupCatalogIdByNSS(a, boost::none), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(b, boost::none), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(c, boost::none), boost::none); + ASSERT_NE(catalog()->lookupCatalogIdByNSS(d, boost::none), boost::none); + ASSERT_EQ(catalog()->lookupCatalogIdByNSS(e, boost::none), boost::none); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/catalog/collection_writer_test.cpp b/src/mongo/db/catalog/collection_writer_test.cpp index a5aad697d59..121afe9b89d 100644 --- a/src/mongo/db/catalog/collection_writer_test.cpp +++ b/src/mongo/db/catalog/collection_writer_test.cpp @@ -58,7 +58,8 @@ protected: std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(kNss); CollectionCatalog::write(getServiceContext(), [&](CollectionCatalog& catalog) { - catalog.registerCollection(operationContext(), UUID::gen(), std::move(collection)); + catalog.registerCollection( + operationContext(), UUID::gen(), std::move(collection), /*ts=*/boost::none); }); } @@ -255,7 +256,8 @@ public: catalog.registerCollection(operationContext(), UUID::gen(), std::make_shared<CollectionMock>( - NamespaceString("many", fmt::format("coll{}", i)))); + NamespaceString("many", fmt::format("coll{}", i))), + /*ts=*/boost::none); } }); } diff --git a/src/mongo/db/catalog/uncommitted_catalog_updates.cpp b/src/mongo/db/catalog/uncommitted_catalog_updates.cpp index ba987b36d35..69d405626b7 100644 --- a/src/mongo/db/catalog/uncommitted_catalog_updates.cpp +++ b/src/mongo/db/catalog/uncommitted_catalog_updates.cpp @@ -117,12 +117,13 @@ void UncommittedCatalogUpdates::_createCollection(OperationContext* opCtx, // This will throw when registering a namespace which is already in use. CollectionCatalog::write(opCtx, [&, coll = createdColl](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, uuid, coll); + catalog.registerCollection(opCtx, uuid, coll, /*ts=*/boost::none); }); opCtx->recoveryUnit()->onRollback([opCtx, uuid]() { CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.deregisterCollection(opCtx, uuid, /*isDropPending=*/false); + catalog.deregisterCollection( + opCtx, uuid, /*isDropPending=*/false, /*ts=*/boost::none); }); }); }); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 96f0715cb11..6debe506089 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -479,7 +479,8 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfter // Need to put the collection in the collection catalog so the resume token is valid. std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(expCtx->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(expCtx->opCtx, testUuid(), std::move(collection)); + catalog.registerCollection( + expCtx->opCtx, testUuid(), std::move(collection), /*ts=*/boost::none); }); ASSERT_THROWS_CODE(DSChangeStream::createFromBson( @@ -502,7 +503,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAfterAndResumeAfterOptions) { // Need to put the collection in the collection catalog so the resume token is validcollection std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, testUuid(), std::move(collection)); + catalog.registerCollection(opCtx, testUuid(), std::move(collection), /*ts=*/boost::none); }); ASSERT_THROWS_CODE( @@ -530,7 +531,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndStartAfterO // Need to put the collection in the collection catalog so the resume token is valid. std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, testUuid(), std::move(collection)); + catalog.registerCollection(opCtx, testUuid(), std::move(collection), /*ts=*/boost::none); }); ASSERT_THROWS_CODE(DSChangeStream::createFromBson( @@ -553,7 +554,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectResumeAfterWithResumeTokenMissingUUID) // Need to put the collection in the collection catalog so the resume token is valid. std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, testUuid(), std::move(collection)); + catalog.registerCollection(opCtx, testUuid(), std::move(collection), /*ts=*/boost::none); }); ASSERT_THROWS_CODE( @@ -2708,7 +2709,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2FieldIn std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); @@ -2751,7 +2753,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldUseO2FieldInOplog) { std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); @@ -2794,7 +2797,8 @@ TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) { std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); // Create a resume token from only the timestamp. @@ -2855,7 +2859,8 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) { // Need to put the collection in the collection catalog so the resume token is valid. std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(expCtx->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, testUuid(), std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, testUuid(), std::move(collection), /*ts=*/boost::none); }); const auto resumeTokenInvalidate = @@ -3317,7 +3322,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2Field std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); @@ -3355,7 +3361,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldUseO2FieldInOplog) { std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); BSONObj docKey = BSON("_id" << 1); @@ -3393,7 +3400,8 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) { // Need to put the collection in the collection catalog so the resume token is valid. std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(expCtx->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, testUuid(), std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, testUuid(), std::move(collection), /*ts=*/boost::none); }); const auto resumeTokenInvalidate = @@ -3417,7 +3425,8 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) { std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. @@ -3451,7 +3460,8 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss); CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + catalog.registerCollection( + getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none); }); // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry. diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp index 95c9557c7f8..67b4f9d020a 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp @@ -150,8 +150,10 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_Man auto uuid = UUID::gen(); CollectionCatalog::write(getServiceContext(), [&](CollectionCatalog& catalog) { - catalog.registerCollection( - operationContext(), uuid, std::make_shared<CollectionMock>(kNamespace)); + catalog.registerCollection(operationContext(), + uuid, + std::make_shared<CollectionMock>(kNamespace), + /*ts=*/boost::none); }); auto future = launchAsync([&] { diff --git a/src/mongo/db/storage/kv/durable_catalog_test.cpp b/src/mongo/db/storage/kv/durable_catalog_test.cpp index c240f2dd9f3..5f371ee9b57 100644 --- a/src/mongo/db/storage/kv/durable_catalog_test.cpp +++ b/src/mongo/db/storage/kv/durable_catalog_test.cpp @@ -111,8 +111,10 @@ public: getCatalog()->getMetaData(operationContext(), catalogId), std::move(coll.second)); CollectionCatalog::write(operationContext(), [&](CollectionCatalog& catalog) { - catalog.registerCollection( - operationContext(), options.uuid.value(), std::move(collection)); + catalog.registerCollection(operationContext(), + options.uuid.value(), + std::move(collection), + /*ts=*/boost::none); }); wuow.commit(); diff --git a/src/mongo/db/storage/kv/storage_engine_test.cpp b/src/mongo/db/storage/kv/storage_engine_test.cpp index 2a143839964..c3f5ae88e31 100644 --- a/src/mongo/db/storage/kv/storage_engine_test.cpp +++ b/src/mongo/db/storage/kv/storage_engine_test.cpp @@ -110,7 +110,8 @@ TEST_F(StorageEngineTest, LoadCatalogDropsOrphansAfterUncleanShutdown) { { Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow); _storageEngine->closeCatalog(opCtx.get()); - _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kUnclean); + _storageEngine->loadCatalog( + opCtx.get(), boost::none, StorageEngine::LastShutdownState::kUnclean); } ASSERT(!identExists(opCtx.get(), swCollInfo.getValue().ident)); @@ -408,7 +409,8 @@ TEST_F(StorageEngineRepairTest, LoadCatalogRecoversOrphans) { { Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow); _storageEngine->closeCatalog(opCtx.get()); - _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kClean); + _storageEngine->loadCatalog( + opCtx.get(), boost::none, StorageEngine::LastShutdownState::kClean); } ASSERT(identExists(opCtx.get(), swCollInfo.getValue().ident)); @@ -464,7 +466,8 @@ TEST_F(StorageEngineRepairTest, LoadCatalogRecoversOrphansInCatalog) { // When in a repair context, loadCatalog() recreates catalog entries for orphaned idents. { Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow); - _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kClean); + _storageEngine->loadCatalog( + opCtx.get(), boost::none, StorageEngine::LastShutdownState::kClean); } auto identNs = swCollInfo.getValue().ident; std::replace(identNs.begin(), identNs.end(), '-', '_'); @@ -500,7 +503,8 @@ TEST_F(StorageEngineTest, LoadCatalogDropsOrphans) { // orphaned idents. { Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow); - _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kClean); + _storageEngine->loadCatalog( + opCtx.get(), boost::none, StorageEngine::LastShutdownState::kClean); } // reconcileCatalogAndIdents() drops orphaned idents. auto reconcileResult = unittest::assertGet(reconcile(opCtx.get())); diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index a67a04e7aa7..d3d6dbc5ee2 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -226,7 +226,9 @@ public: * caller. For example, on starting from a previous unclean shutdown, we may try to recover * orphaned idents, which are known to the storage engine but not referenced in the catalog. */ - virtual void loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) = 0; + virtual void loadCatalog(OperationContext* opCtx, + boost::optional<Timestamp> stableTs, + LastShutdownState lastShutdownState) = 0; virtual void closeCatalog(OperationContext* opCtx) = 0; /** diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp index 2743c014c96..a73de55a3fa 100644 --- a/src/mongo/db/storage/storage_engine_impl.cpp +++ b/src/mongo/db/storage/storage_engine_impl.cpp @@ -93,6 +93,19 @@ StorageEngineImpl::StorageEngineImpl(OperationContext* opCtx, [serviceContext = opCtx->getServiceContext()](Timestamp timestamp) { HistoricalIdentTracker::get(serviceContext).removeEntriesOlderThan(timestamp); }), + _collectionCatalogCleanupTimestampListener( + TimestampMonitor::TimestampType::kOldest, + [serviceContext = opCtx->getServiceContext()](Timestamp timestamp) { + // The global lock is held by the timestamp monitor while callbacks are executed, so + // there can be no batched CollectionCatalog writer and we are thus safe to write + // using the service context. + if (CollectionCatalog::get(serviceContext) + ->needsCleanupForOldestTimestamp(timestamp)) { + CollectionCatalog::write(serviceContext, [timestamp](CollectionCatalog& catalog) { + catalog.cleanupForOldestTimestampAdvanced(timestamp); + }); + } + }), _supportsCappedCollections(_engine->supportsCappedCollections()) { uassert(28601, "Storage engine does not support --directoryperdb", @@ -114,11 +127,14 @@ StorageEngineImpl::StorageEngineImpl(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked() || opCtx->lockState()->isW()); Lock::GlobalWrite globalLk(opCtx); loadCatalog(opCtx, + _engine->getRecoveryTimestamp(), _options.lockFileCreatedByUncleanShutdown ? LastShutdownState::kUnclean : LastShutdownState::kClean); } -void StorageEngineImpl::loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) { +void StorageEngineImpl::loadCatalog(OperationContext* opCtx, + boost::optional<Timestamp> stableTs, + LastShutdownState lastShutdownState) { bool catalogExists = _engine->hasIdent(opCtx, kCatalogInfo); if (_options.forRepair && catalogExists) { auto repairObserver = StorageRepairObserver::get(getGlobalServiceContext()); @@ -344,7 +360,10 @@ void StorageEngineImpl::loadCatalog(OperationContext* opCtx, LastShutdownState l } Timestamp minVisibleTs = Timestamp::min(); - Timestamp minValidTs = minVisibleTs; + // Use the stable timestamp as minValid. We know for a fact that the collection exist at + // this point and is in sync. If we use an earlier timestamp than replication rollback we + // may be out-of-order for the collection catalog managing this namespace. + Timestamp minValidTs = stableTs ? *stableTs : Timestamp::min(); // If there's no recovery timestamp, every collection is available. if (boost::optional<Timestamp> recoveryTs = _engine->getRecoveryTimestamp()) { // Otherwise choose a minimum visible timestamp that's at least as large as the true @@ -353,10 +372,6 @@ void StorageEngineImpl::loadCatalog(OperationContext* opCtx, LastShutdownState l // `oldestTimestamp` and conservatively choose the `recoveryTimestamp` for everything // else. minVisibleTs = recoveryTs.value(); - // Minimum valid timestamp is always set to the recovery timestamp. Even if the - // collection exists at the oldest timestamp we do not know if it would be in sync with - // the durable catalog due to collMod or index changes. - minValidTs = minVisibleTs; if (existedAtOldestTs.find(entry.catalogId) != existedAtOldestTs.end()) { // Collections found at the `oldestTimestamp` on startup can have their minimum // visible timestamp pulled back to that value. @@ -410,10 +425,10 @@ void StorageEngineImpl::_initCollection(OperationContext* opCtx, auto collectionFactory = Collection::Factory::get(getGlobalServiceContext()); auto collection = collectionFactory->make(opCtx, nss, catalogId, md, std::move(rs)); collection->setMinimumVisibleSnapshot(minVisibleTs); - collection->setMinimumValidSnapshot(minValidTs); CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, md->options.uuid.value(), std::move(collection)); + catalog.registerCollection( + opCtx, md->options.uuid.value(), std::move(collection), /*commitTime*/ minValidTs); }); } @@ -840,6 +855,7 @@ void StorageEngineImpl::startTimestampMonitor() { _timestampMonitor->addListener(&_minOfCheckpointAndOldestTimestampListener); _timestampMonitor->addListener(&_historicalIdentTimestampListener); + _timestampMonitor->addListener(&_collectionCatalogCleanupTimestampListener); } void StorageEngineImpl::notifyStartupComplete() { @@ -1019,7 +1035,8 @@ Status StorageEngineImpl::repairRecordStore(OperationContext* opCtx, // After repairing, re-initialize the collection with a valid RecordStore. CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { auto uuid = catalog.lookupUUIDByNSS(opCtx, nss).value(); - catalog.deregisterCollection(opCtx, uuid, /*isDropPending=*/false); + catalog.deregisterCollection( + opCtx, uuid, /*isDropPending=*/false, /*commitTime*/ boost::none); }); // When repairing a record store, keep the existing behavior of not installing a minimum visible diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h index f95b86f32a7..6a431abe7df 100644 --- a/src/mongo/db/storage/storage_engine_impl.h +++ b/src/mongo/db/storage/storage_engine_impl.h @@ -350,7 +350,9 @@ public: /** * When loading after an unclean shutdown, this performs cleanup on the DurableCatalogImpl. */ - void loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) final; + void loadCatalog(OperationContext* opCtx, + boost::optional<Timestamp> stableTs, + LastShutdownState lastShutdownState) final; void closeCatalog(OperationContext* opCtx) final; @@ -454,6 +456,9 @@ private: // checkpoint timestamp. TimestampMonitor::TimestampListener _historicalIdentTimestampListener; + // Listener for cleanup of CollectionCatalog when oldest timestamp advances. + TimestampMonitor::TimestampListener _collectionCatalogCleanupTimestampListener; + const bool _supportsCappedCollections; std::unique_ptr<RecordStore> _catalogRecordStore; diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h index 66665f9edec..37542e2ac8d 100644 --- a/src/mongo/db/storage/storage_engine_mock.h +++ b/src/mongo/db/storage/storage_engine_mock.h @@ -53,7 +53,9 @@ public: bool isEphemeral() const final { return true; } - void loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) final {} + void loadCatalog(OperationContext* opCtx, + boost::optional<Timestamp> stableTs, + LastShutdownState lastShutdownState) final {} void closeCatalog(OperationContext* opCtx) final {} Status closeDatabase(OperationContext* opCtx, const DatabaseName& dbName) final { return Status::OK(); diff --git a/src/mongo/db/storage/storage_engine_test_fixture.h b/src/mongo/db/storage/storage_engine_test_fixture.h index 42e748bc588..364d9467b0a 100644 --- a/src/mongo/db/storage/storage_engine_test_fixture.h +++ b/src/mongo/db/storage/storage_engine_test_fixture.h @@ -76,7 +76,8 @@ public: _storageEngine->getCatalog()->getMetaData(opCtx, catalogId), std::move(rs)); CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) { - catalog.registerCollection(opCtx, options.uuid.get(), std::move(coll)); + catalog.registerCollection( + opCtx, options.uuid.get(), std::move(coll), /*ts=*/boost::none); }); return {{_storageEngine->getCatalog()->getEntry(catalogId)}}; |