summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2022-09-26 12:18:37 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-26 13:18:13 +0000
commit62dae95dc6c02786dce1c6c97bae488ac95b9601 (patch)
tree481772b29b4364f096c20203a89bd8eaa4564750
parentce002ed993b51bdcd1b02fdae0d2ae9c71a932de (diff)
downloadmongo-62dae95dc6c02786dce1c6c97bae488ac95b9601.tar.gz
SERVER-68265 Maintain historic catalogId mapping in CollectionCatalog
Mapping of namespace to catalogId is maintained for timestamps back to the oldest timestamp. Used to be able to resolve catalogId for a namespace that is resilient to create, drop and rename operations.
-rw-r--r--src/mongo/db/catalog/catalog_control.cpp8
-rw-r--r--src/mongo/db/catalog/collection_catalog.cpp280
-rw-r--r--src/mongo/db/catalog/collection_catalog.h74
-rw-r--r--src/mongo/db/catalog/collection_catalog_bm.cpp5
-rw-r--r--src/mongo/db/catalog/collection_catalog_test.cpp426
-rw-r--r--src/mongo/db/catalog/collection_writer_test.cpp6
-rw-r--r--src/mongo/db/catalog/uncommitted_catalog_updates.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp36
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp6
-rw-r--r--src/mongo/db/storage/kv/durable_catalog_test.cpp6
-rw-r--r--src/mongo/db/storage/kv/storage_engine_test.cpp12
-rw-r--r--src/mongo/db/storage/storage_engine.h4
-rw-r--r--src/mongo/db/storage/storage_engine_impl.cpp35
-rw-r--r--src/mongo/db/storage/storage_engine_impl.h7
-rw-r--r--src/mongo/db/storage/storage_engine_mock.h4
-rw-r--r--src/mongo/db/storage/storage_engine_test_fixture.h3
16 files changed, 834 insertions, 83 deletions
diff --git a/src/mongo/db/catalog/catalog_control.cpp b/src/mongo/db/catalog/catalog_control.cpp
index 7f4dc6167e8..95774bbe155 100644
--- a/src/mongo/db/catalog/catalog_control.cpp
+++ b/src/mongo/db/catalog/catalog_control.cpp
@@ -243,9 +243,15 @@ void openCatalog(OperationContext* opCtx,
// Load the catalog in the storage engine.
LOGV2(20273, "openCatalog: loading storage engine catalog");
auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
+
+ // Remove catalogId mappings for larger timestamp than 'stableTimestamp'.
+ CollectionCatalog::write(opCtx, [stableTimestamp](CollectionCatalog& catalog) {
+ catalog.cleanupForCatalogReopen(stableTimestamp);
+ });
+
// Ignore orphaned idents because this function is used during rollback and not at
// startup recovery, when we may try to recover orphaned idents.
- storageEngine->loadCatalog(opCtx, StorageEngine::LastShutdownState::kClean);
+ storageEngine->loadCatalog(opCtx, stableTimestamp, StorageEngine::LastShutdownState::kClean);
LOGV2(20274, "openCatalog: reconciling catalog and idents");
auto reconcileResult = fassert(
diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp
index 26c630537a8..3a7e255f464 100644
--- a/src/mongo/db/catalog/collection_catalog.cpp
+++ b/src/mongo/db/catalog/collection_catalog.cpp
@@ -154,22 +154,29 @@ public:
break;
}
case UncommittedCatalogUpdates::Entry::Action::kRenamedCollection: {
- writeJobs.push_back([opCtx, &from = entry.nss, &to = entry.renameTo](
- CollectionCatalog& catalog) {
+ writeJobs.push_back([opCtx,
+ &from = entry.nss,
+ &to = entry.renameTo,
+ commitTime](CollectionCatalog& catalog) {
+ // We just need to do modifications on 'from' here. 'to' is taken care
+ // of by a separate kWritableCollection entry.
catalog._collections.erase(from);
auto& resourceCatalog = ResourceCatalog::get(opCtx->getServiceContext());
resourceCatalog.remove({RESOURCE_COLLECTION, from}, from);
resourceCatalog.add({RESOURCE_COLLECTION, to}, to);
+
+ catalog._pushCatalogIdForRename(from, to, commitTime);
});
break;
}
case UncommittedCatalogUpdates::Entry::Action::kDroppedCollection: {
- writeJobs.push_back(
- [opCtx, uuid = *entry.uuid(), isDropPending = *entry.isDropPending](
- CollectionCatalog& catalog) {
- catalog.deregisterCollection(opCtx, uuid, isDropPending);
- });
+ writeJobs.push_back([opCtx,
+ uuid = *entry.uuid(),
+ isDropPending = *entry.isDropPending,
+ commitTime](CollectionCatalog& catalog) {
+ catalog.deregisterCollection(opCtx, uuid, isDropPending, commitTime);
+ });
break;
}
case UncommittedCatalogUpdates::Entry::Action::kRecreatedCollection: {
@@ -177,17 +184,12 @@ public:
collection = entry.collection,
uuid = *entry.externalUUID,
commitTime](CollectionCatalog& catalog) {
- if (commitTime) {
- collection->setMinimumValidSnapshot(commitTime.value());
- }
- catalog.registerCollection(opCtx, uuid, std::move(collection));
+ catalog.registerCollection(opCtx, uuid, std::move(collection), commitTime);
});
// Fallthrough to the createCollection case to finish committing the collection.
[[fallthrough]];
}
case UncommittedCatalogUpdates::Entry::Action::kCreatedCollection: {
- auto collPtr = entry.collection.get();
-
// By this point, we may or may not have reserved an oplog slot for the
// collection creation.
// For example, multi-document transactions will only reserve the oplog slot at
@@ -197,11 +199,15 @@ public:
// we must update the minVisibleTimestamp with the appropriate value. This is
// fine because the collection should not be visible in the catalog until we
// call setCommitted(true).
- if (commitTime) {
- collPtr->setMinimumVisibleSnapshot(commitTime.value());
- collPtr->setMinimumValidSnapshot(commitTime.value());
- }
- collPtr->setCommitted(true);
+ writeJobs.push_back([coll = entry.collection.get(),
+ commitTime](CollectionCatalog& catalog) {
+ if (commitTime) {
+ coll->setMinimumVisibleSnapshot(commitTime.value());
+ coll->setMinimumValidSnapshot(commitTime.value());
+ }
+ catalog._pushCatalogIdForNSS(coll->ns(), coll->getCatalogId(), commitTime);
+ coll->setCommitted(true);
+ });
break;
}
case UncommittedCatalogUpdates::Entry::Action::kReplacedViewsForDatabase: {
@@ -1122,6 +1128,25 @@ boost::optional<UUID> CollectionCatalog::lookupUUIDByNSS(OperationContext* opCtx
return boost::none;
}
+boost::optional<RecordId> CollectionCatalog::lookupCatalogIdByNSS(
+ const NamespaceString& nss, boost::optional<Timestamp> ts) const {
+ if (auto it = _catalogIds.find(nss); it != _catalogIds.end()) {
+ const auto& range = it->second;
+ if (!ts) {
+ return range.back().id;
+ }
+
+ auto rangeIt = std::upper_bound(
+ range.begin(), range.end(), *ts, [](const auto& ts, const auto& entry) {
+ return ts < entry.ts;
+ });
+ if (rangeIt != range.begin()) {
+ return (--rangeIt)->id;
+ }
+ }
+ return boost::none;
+}
+
void CollectionCatalog::iterateViews(OperationContext* opCtx,
const DatabaseName& dbName,
ViewIteratorCallback callback,
@@ -1305,7 +1330,8 @@ CollectionCatalog::ViewCatalogSet CollectionCatalog::getViewCatalogDbNames(
void CollectionCatalog::registerCollection(OperationContext* opCtx,
const UUID& uuid,
- std::shared_ptr<Collection> coll) {
+ std::shared_ptr<Collection> coll,
+ boost::optional<Timestamp> commitTime) {
auto nss = coll->ns();
_ensureNamespaceDoesNotExist(opCtx, nss, NamespaceType::kAll);
@@ -1326,6 +1352,12 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx,
_collections[nss] = coll;
_orderedCollections[dbIdPair] = coll;
+ if (commitTime && !commitTime->isNull()) {
+ coll->setMinimumValidSnapshot(commitTime.value());
+ _pushCatalogIdForNSS(nss, coll->getCatalogId(), commitTime);
+ }
+
+
if (!nss.isOnInternalDb() && !nss.isSystem()) {
_stats.userCollections += 1;
if (coll->isCapped()) {
@@ -1345,9 +1377,11 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx,
resourceCatalog.add({RESOURCE_COLLECTION, nss}, nss);
}
-std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationContext* opCtx,
- const UUID& uuid,
- bool isDropPending) {
+std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(
+ OperationContext* opCtx,
+ const UUID& uuid,
+ bool isDropPending,
+ boost::optional<Timestamp> commitTime) {
invariant(_catalog.find(uuid) != _catalog.end());
auto coll = std::move(_catalog[uuid]);
@@ -1374,6 +1408,11 @@ std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationCon
_collections.erase(ns);
_catalog.erase(uuid);
+ // Push drop unless this is a rollback of a create
+ if (coll->isCommitted()) {
+ _pushCatalogIdForNSS(ns, boost::none, commitTime);
+ }
+
if (!ns.isOnInternalDb() && !ns.isSystem()) {
_stats.userCollections -= 1;
if (coll->isCapped()) {
@@ -1446,6 +1485,110 @@ void CollectionCatalog::_ensureNamespaceDoesNotExist(OperationContext* opCtx,
}
}
+void CollectionCatalog::_pushCatalogIdForNSS(const NamespaceString& nss,
+ boost::optional<RecordId> catalogId,
+ boost::optional<Timestamp> ts) {
+ // TODO SERVER-68674: Remove feature flag check.
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ // No-op.
+ return;
+ }
+
+ auto& ids = _catalogIds[nss];
+
+ if (!ts) {
+ // Make sure untimestamped writes have a single entry in mapping. If we're mixing
+ // timestamped with untimestamped (such as repair). Ignore the untimestamped writes as an
+ // untimestamped deregister will correspond with an untimestamped register. We should leave
+ // the mapping as-is in this case.
+ if (ids.empty() && catalogId) {
+ // This namespace was added due to an untimestamped write, add an entry with min
+ // timestamp
+ ids.push_back(TimestampedCatalogId{catalogId, Timestamp::min()});
+ } else if (ids.size() == 1 && !catalogId) {
+ // This namespace was removed due to an untimestamped write, clear entries.
+ ids.clear();
+ }
+ return;
+ }
+
+ // Re-write latest entry if timestamp match (multiple changes occured in this transaction)
+ if (!ids.empty() && ids.back().ts == *ts) {
+ ids.back().id = catalogId;
+ return;
+ }
+
+ // Otherwise, push new entry at the end. Timestamp is always increasing
+ invariant(ids.empty() || ids.back().ts < *ts);
+ // If the catalogId is the same as last entry, there's nothing we need to do. This can happen
+ // when the catalog is reopened.
+ if (!ids.empty() && ids.back().id == catalogId) {
+ return;
+ }
+
+ ids.push_back(TimestampedCatalogId{catalogId, *ts});
+ _markNamespaceForCatalogIdCleanupIfNeeded(nss, ids);
+}
+
+void CollectionCatalog::_pushCatalogIdForRename(const NamespaceString& from,
+ const NamespaceString& to,
+ boost::optional<Timestamp> ts) {
+ // TODO SERVER-68674: Remove feature flag check.
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ // No-op.
+ return;
+ }
+
+ if (!ts)
+ return;
+
+ // Get 'toIds' first, it may need to instantiate in the container which invalidates all
+ // references.
+ auto& toIds = _catalogIds[to];
+ auto& fromIds = _catalogIds.at(from);
+ invariant(!fromIds.empty());
+
+ // Re-write latest entry if timestamp match (multiple changes occured in this transaction),
+ // otherwise push at end
+ if (!toIds.empty() && toIds.back().ts == *ts) {
+ toIds.back().id = fromIds.back().id;
+ } else {
+ invariant(toIds.empty() || toIds.back().ts < *ts);
+ toIds.push_back(TimestampedCatalogId{fromIds.back().id, *ts});
+ _markNamespaceForCatalogIdCleanupIfNeeded(to, toIds);
+ }
+
+ // Re-write latest entry if timestamp match (multiple changes occured in this transaction),
+ // otherwise push at end
+ if (!fromIds.empty() && fromIds.back().ts == *ts) {
+ fromIds.back().id = boost::none;
+ } else {
+ invariant(fromIds.empty() || fromIds.back().ts < *ts);
+ fromIds.push_back(TimestampedCatalogId{boost::none, *ts});
+ _markNamespaceForCatalogIdCleanupIfNeeded(from, fromIds);
+ }
+}
+
+void CollectionCatalog::_markNamespaceForCatalogIdCleanupIfNeeded(
+ const NamespaceString& nss, const std::vector<TimestampedCatalogId>& ids) {
+
+ auto markForCleanup = [this, &nss](Timestamp ts) {
+ _catalogIdChanges.insert(nss);
+ if (ts < _lowestCatalogIdTimestampForCleanup) {
+ _lowestCatalogIdTimestampForCleanup = ts;
+ }
+ };
+
+ // Cleanup may occur if we have more than one entry for the namespace or if the only entry is a
+ // drop. Use the first entry as lowest cleanup time if we have a drop and the second otherwise
+ // (as the first is needed until that time is reached)
+ if (!ids.empty() && ids.front().id == boost::none) {
+ markForCleanup(ids.front().ts);
+ } else if (ids.size() > 1) {
+ markForCleanup(ids.at(1).ts);
+ }
+}
+
void CollectionCatalog::deregisterAllCollectionsAndViews(ServiceContext* svcCtx) {
LOGV2(20282, "Deregistering all the collections");
for (auto& entry : _catalog) {
@@ -1538,6 +1681,99 @@ CollectionCatalog::iterator CollectionCatalog::end(OperationContext* opCtx) cons
return iterator(opCtx, _orderedCollections.end(), *this);
}
+bool CollectionCatalog::needsCleanupForOldestTimestamp(Timestamp oldest) const {
+ // TODO SERVER-68674: Remove feature flag check.
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ // No-op.
+ return false;
+ }
+
+ return _lowestCatalogIdTimestampForCleanup <= oldest;
+}
+
+void CollectionCatalog::cleanupForOldestTimestampAdvanced(Timestamp oldest) {
+ Timestamp nextLowestCleanupTimestamp = Timestamp::max();
+ // Helper to calculate the smallest entry that needs to be kept and its timestamp
+ auto assignLowestCleanupTimestamp = [&nextLowestCleanupTimestamp](const auto& range) {
+ auto it = range.begin();
+ // Drops can be cleaned up right away, otherwise the second entry is cleanup time.
+ if (it->id.has_value()) {
+ ++it;
+ }
+ nextLowestCleanupTimestamp = std::min(nextLowestCleanupTimestamp, it->ts);
+ };
+
+ // Iterate over all namespaces that is marked that they need cleanup
+ for (auto it = _catalogIdChanges.begin(), end = _catalogIdChanges.end(); it != end;) {
+ auto& range = _catalogIds[*it];
+
+ // Binary search for next larger timestamp
+ auto rangeIt = std::upper_bound(
+ range.begin(), range.end(), oldest, [](const auto& ts, const auto& entry) {
+ return ts < entry.ts;
+ });
+
+ // Continue if there is nothing to cleanup for this timestamp yet
+ if (rangeIt == range.begin()) {
+ assignLowestCleanupTimestamp(range);
+ ++it;
+ continue;
+ }
+
+ // The iterator is positioned to the closest entry that has a larger timestamp, decrement to
+ // get a lower or equal timestamp
+ --rangeIt;
+
+ // If we are positioned on a drop it can be removed
+ if (!rangeIt->id.has_value()) {
+ ++rangeIt;
+ }
+
+ // Erase range
+ range.erase(range.begin(), rangeIt);
+
+ // If the range is now empty or we need to keep the last item we can unmark this namespace
+ // for needing changes.
+ if (range.size() <= 1) {
+ _catalogIdChanges.erase(it++);
+ continue;
+ }
+
+ // More changes are needed for this namespace, keep it in the set and keep track of lowest
+ // timestamp.
+ assignLowestCleanupTimestamp(range);
+ ++it;
+ }
+
+ _lowestCatalogIdTimestampForCleanup = nextLowestCleanupTimestamp;
+}
+
+void CollectionCatalog::cleanupForCatalogReopen(Timestamp stable) {
+ _catalogIdChanges.clear();
+ _lowestCatalogIdTimestampForCleanup = Timestamp::max();
+
+ for (auto it = _catalogIds.begin(); it != _catalogIds.end();) {
+ auto& ids = it->second;
+
+ // Remove all larger timestamps in this range
+ ids.erase(std::upper_bound(ids.begin(),
+ ids.end(),
+ stable,
+ [](Timestamp ts, const auto& entry) { return ts < entry.ts; }),
+ ids.end());
+
+ // Remove namespace if there are no entries left
+ if (ids.empty()) {
+ _catalogIds.erase(it++);
+ continue;
+ }
+
+ // Calculate when this namespace needs to be cleaned up next
+ _markNamespaceForCatalogIdCleanupIfNeeded(it->first, ids);
+ ++it;
+ }
+}
+
void CollectionCatalog::invariantHasExclusiveAccessToCollection(OperationContext* opCtx,
const NamespaceString& nss) {
auto& uncommittedCatalogUpdates = UncommittedCatalogUpdates::get(opCtx);
diff --git a/src/mongo/db/catalog/collection_catalog.h b/src/mongo/db/catalog/collection_catalog.h
index 80d73fb11fb..a3bd18e51df 100644
--- a/src/mongo/db/catalog/collection_catalog.h
+++ b/src/mongo/db/catalog/collection_catalog.h
@@ -38,7 +38,7 @@
#include "mongo/db/database_name.h"
#include "mongo/db/profile_filter.h"
#include "mongo/db/service_context.h"
-// TODO SERVER-68265: remove include.
+// TODO SERVER-69372: remove include.
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/views/view.h"
#include "mongo/stdx/unordered_map.h"
@@ -210,7 +210,7 @@ public:
/**
* Returns the collection instance representative of 'entry' at the provided read timestamp.
*
- * TODO SERVER-68265:
+ * TODO SERVER-69372:
* - Use NamespaceString instead of DurableCatalog::Entry.
* - Remove DurableCatalog dependency.
*
@@ -288,7 +288,8 @@ public:
*/
void registerCollection(OperationContext* opCtx,
const UUID& uuid,
- std::shared_ptr<Collection> collection);
+ std::shared_ptr<Collection> collection,
+ boost::optional<Timestamp> commitTime);
/**
* Deregister the collection.
@@ -297,7 +298,8 @@ public:
*/
std::shared_ptr<Collection> deregisterCollection(OperationContext* opCtx,
const UUID& uuid,
- bool isDropPending);
+ bool isDropPending,
+ boost::optional<Timestamp> commitTime);
/**
* Create a temporary record of an uncommitted view namespace to aid in detecting a simultaneous
@@ -401,6 +403,17 @@ public:
const NamespaceString& nss) const;
/**
+ * Returns the CatalogId for a given 'nss' at timestamp 'ts'.
+ *
+ * Timestamp must be in the range [oldest_timestamp, now)
+ * If 'ts' is boost::none the latest CatalogId is returned.
+ *
+ * Returns boost::none if no namespace exist at the timestamp or if 'ts' is out of range.
+ */
+ boost::optional<RecordId> lookupCatalogIdByNSS(
+ const NamespaceString& nss, boost::optional<Timestamp> ts = boost::none) const;
+
+ /**
* Iterates through the views in the catalog associated with database `dbName`, applying
* 'callback' to each view. If the 'callback' returns false, the iterator exits early.
*
@@ -566,6 +579,23 @@ public:
iterator end(OperationContext* opCtx) const;
/**
+ * Checks if 'cleanupForOldestTimestampAdvanced' should be called when the oldest timestamp
+ * advanced. Used to avoid a potentially expensive call to 'cleanupForOldestTimestampAdvanced'
+ * if no write is needed.
+ */
+ bool needsCleanupForOldestTimestamp(Timestamp oldest) const;
+
+ /**
+ * Cleans up internal structures when the oldest timestamp advances
+ */
+ void cleanupForOldestTimestampAdvanced(Timestamp oldest);
+
+ /**
+ * Cleans up internal structures after catalog reopen
+ */
+ void cleanupForCatalogReopen(Timestamp stable);
+
+ /**
* Ensures we have a MODE_X lock on a collection or MODE_IX lock for newly created collections.
*/
static void invariantHasExclusiveAccessToCollection(OperationContext* opCtx,
@@ -628,6 +658,33 @@ private:
NamespaceType type) const;
/**
+ * CatalogId with Timestamp
+ */
+ struct TimestampedCatalogId {
+ boost::optional<RecordId> id;
+ Timestamp ts;
+ };
+
+ // Push a catalogId for namespace at given Timestamp. Timestamp needs to be larger than other
+ // entries for this namespace. boost::none for catalogId represent drop, boost::none for
+ // timestamp turns this operation into a no-op.
+ void _pushCatalogIdForNSS(const NamespaceString& nss,
+ boost::optional<RecordId> catalogId,
+ boost::optional<Timestamp> ts);
+
+ // Push a catalogId for 'from' and 'to' for a rename operation at given Timestamp. Timestamp
+ // needs to be larger than other entries for these namespaces. boost::none for timestamp turns
+ // this operation into a no-op.
+ void _pushCatalogIdForRename(const NamespaceString& from,
+ const NamespaceString& to,
+ boost::optional<Timestamp> ts);
+
+ // Helper to calculate if a namespace needs to be marked for cleanup for a set of timestamped
+ // catalogIds
+ void _markNamespaceForCatalogIdCleanupIfNeeded(const NamespaceString& nss,
+ const std::vector<TimestampedCatalogId>& ids);
+
+ /**
* When present, indicates that the catalog is in closed state, and contains a map from UUID
* to pre-close NSS. See also onCloseCatalog.
*/
@@ -647,6 +704,15 @@ private:
NamespaceCollectionMap _collections;
UncommittedViewsSet _uncommittedViews;
+ // CatalogId mappings for all known namespaces for the CollectionCatalog. The vector is sorted
+ // on timestamp.
+ absl::flat_hash_map<NamespaceString, std::vector<TimestampedCatalogId>> _catalogIds;
+ // Set of namespaces that need cleanup when the oldest timestamp advances sufficiently.
+ absl::flat_hash_set<NamespaceString> _catalogIdChanges;
+ // Point at which the oldest timestamp need to advance for there to be any catalogId namespace
+ // that can be cleaned up
+ Timestamp _lowestCatalogIdTimestampForCleanup = Timestamp::max();
+
// Map of database names to their corresponding views and other associated state.
ViewsForDatabaseMap _viewsForDatabase;
diff --git a/src/mongo/db/catalog/collection_catalog_bm.cpp b/src/mongo/db/catalog/collection_catalog_bm.cpp
index 3ff0e3375bd..d04f2115f55 100644
--- a/src/mongo/db/catalog/collection_catalog_bm.cpp
+++ b/src/mongo/db/catalog/collection_catalog_bm.cpp
@@ -78,7 +78,10 @@ void createCollections(OperationContext* opCtx, int numCollections) {
for (auto i = 0; i < numCollections; i++) {
const NamespaceString nss("collection_catalog_bm", std::to_string(i));
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, UUID::gen(), std::make_shared<CollectionMock>(nss));
+ catalog.registerCollection(opCtx,
+ UUID::gen(),
+ std::make_shared<CollectionMock>(nss),
+ /*ts=*/boost::none);
});
}
}
diff --git a/src/mongo/db/catalog/collection_catalog_test.cpp b/src/mongo/db/catalog/collection_catalog_test.cpp
index bba37ae83b6..59f31db4ed7 100644
--- a/src/mongo/db/catalog/collection_catalog_test.cpp
+++ b/src/mongo/db/catalog/collection_catalog_test.cpp
@@ -74,7 +74,7 @@ public:
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(colUUID, nss);
col = CollectionPtr(collection.get(), CollectionPtr::NoYieldTag{});
// Register dummy collection in catalog.
- catalog.registerCollection(opCtx.get(), colUUID, collection);
+ catalog.registerCollection(opCtx.get(), colUUID, collection, boost::none);
// Validate that kNumCollectionReferencesStored is correct, add one reference for the one we
// hold in this function.
@@ -112,15 +112,16 @@ public:
dbMap["foo"].insert(std::make_pair(fooUuid, fooColl.get()));
dbMap["bar"].insert(std::make_pair(barUuid, barColl.get()));
- catalog.registerCollection(opCtx.get(), fooUuid, fooColl);
- catalog.registerCollection(opCtx.get(), barUuid, barColl);
+ catalog.registerCollection(opCtx.get(), fooUuid, fooColl, boost::none);
+ catalog.registerCollection(opCtx.get(), barUuid, barColl, boost::none);
}
}
void tearDown() {
for (auto& it : dbMap) {
for (auto& kv : it.second) {
- catalog.deregisterCollection(opCtx.get(), kv.first, /*isDropPending=*/false);
+ catalog.deregisterCollection(
+ opCtx.get(), kv.first, /*isDropPending=*/false, boost::none);
}
}
}
@@ -177,7 +178,7 @@ public:
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
auto uuid = collection->uuid();
- catalog.registerCollection(opCtx.get(), uuid, std::move(collection));
+ catalog.registerCollection(opCtx.get(), uuid, std::move(collection), boost::none);
}
int numEntries = 0;
@@ -209,7 +210,7 @@ public:
}
for (auto&& uuid : collectionsToDeregister) {
- catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false);
+ catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false, boost::none);
}
int numEntries = 0;
@@ -273,7 +274,7 @@ TEST_F(CollectionCatalogResourceTest, LookupMissingCollectionResource) {
TEST_F(CollectionCatalogResourceTest, RemoveCollection) {
const NamespaceString collNs = NamespaceString(boost::none, "resourceDb.coll1");
auto coll = catalog.lookupCollectionByNamespace(opCtx.get(), NamespaceString(collNs));
- catalog.deregisterCollection(opCtx.get(), coll->uuid(), /*isDropPending=*/false);
+ catalog.deregisterCollection(opCtx.get(), coll->uuid(), /*isDropPending=*/false, boost::none);
auto rid = ResourceId(RESOURCE_COLLECTION, collNs);
ASSERT(!ResourceCatalog::get(getServiceContext()).name(rid));
}
@@ -295,7 +296,7 @@ TEST_F(CollectionCatalogIterationTest, GetUUIDWontRepositionEvenIfEntryIsDropped
auto it = catalog.begin(opCtx.get(), DatabaseName(boost::none, "bar"));
auto collsIt = collsIterator("bar");
auto uuid = collsIt->first;
- catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false);
+ catalog.deregisterCollection(opCtx.get(), uuid, /*isDropPending=*/false, boost::none);
dropColl("bar", uuid);
ASSERT_EQUALS(uuid, it.uuid());
@@ -329,7 +330,7 @@ TEST_F(CollectionCatalogTest, InsertAfterLookup) {
// Ensure that looking up non-existing UUIDs doesn't affect later registration of those UUIDs.
ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), newUUID) == nullptr);
ASSERT_EQUALS(catalog.lookupNSSByUUID(opCtx.get(), newUUID), boost::none);
- catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared));
+ catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared), boost::none);
ASSERT_EQUALS(catalog.lookupCollectionByUUID(opCtx.get(), newUUID), newCol);
ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss);
}
@@ -358,7 +359,7 @@ TEST_F(CollectionCatalogTest, OnDropCollection) {
yieldableColl.yield();
ASSERT_FALSE(yieldableColl);
- catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false);
+ catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false, boost::none);
// Ensure the lookup returns a null pointer upon removing the colUUID entry.
ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), colUUID) == nullptr);
@@ -372,7 +373,7 @@ TEST_F(CollectionCatalogTest, RenameCollection) {
NamespaceString oldNss(nss.db(), "oldcol");
std::shared_ptr<Collection> collShared = std::make_shared<CollectionMock>(uuid, oldNss);
auto collection = collShared.get();
- catalog.registerCollection(opCtx.get(), uuid, std::move(collShared));
+ catalog.registerCollection(opCtx.get(), uuid, std::move(collShared), boost::none);
auto yieldableColl = catalog.lookupCollectionByUUID(opCtx.get(), uuid);
ASSERT(yieldableColl);
ASSERT_EQUALS(yieldableColl, collection);
@@ -412,7 +413,7 @@ TEST_F(CollectionCatalogTest, LookupNSSByUUIDForClosedCatalogReturnsOldNSSIfDrop
catalog.onCloseCatalog();
}
- catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false);
+ catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false, boost::none);
ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), colUUID) == nullptr);
ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss);
@@ -438,7 +439,7 @@ TEST_F(CollectionCatalogTest, LookupNSSByUUIDForClosedCatalogReturnsNewlyCreated
ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), newUUID) == nullptr);
ASSERT_EQUALS(catalog.lookupNSSByUUID(opCtx.get(), newUUID), boost::none);
- catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared));
+ catalog.registerCollection(opCtx.get(), newUUID, std::move(newCollShared), boost::none);
ASSERT_EQUALS(catalog.lookupCollectionByUUID(opCtx.get(), newUUID), newCol);
ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss);
@@ -462,10 +463,10 @@ TEST_F(CollectionCatalogTest, LookupNSSByUUIDForClosedCatalogReturnsFreshestNSS)
catalog.onCloseCatalog();
}
- catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false);
+ catalog.deregisterCollection(opCtx.get(), colUUID, /*isDropPending=*/false, boost::none);
ASSERT(catalog.lookupCollectionByUUID(opCtx.get(), colUUID) == nullptr);
ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), nss);
- catalog.registerCollection(opCtx.get(), colUUID, std::move(newCollShared));
+ catalog.registerCollection(opCtx.get(), colUUID, std::move(newCollShared), boost::none);
ASSERT_EQUALS(catalog.lookupCollectionByUUID(opCtx.get(), colUUID), newCol);
ASSERT_EQUALS(*catalog.lookupNSSByUUID(opCtx.get(), colUUID), newNss);
@@ -506,7 +507,7 @@ TEST_F(CollectionCatalogTest, GetAllCollectionNamesAndGetAllDbNames) {
for (auto& nss : nsss) {
std::shared_ptr<Collection> newColl = std::make_shared<CollectionMock>(nss);
auto uuid = UUID::gen();
- catalog.registerCollection(opCtx.get(), uuid, std::move(newColl));
+ catalog.registerCollection(opCtx.get(), uuid, std::move(newColl), boost::none);
}
std::vector<NamespaceString> dCollList = {d1Coll, d2Coll, d3Coll};
@@ -561,7 +562,7 @@ TEST_F(CollectionCatalogTest, GetAllCollectionNamesAndGetAllDbNamesWithUncommitt
for (auto& nss : nsss) {
std::shared_ptr<Collection> newColl = std::make_shared<CollectionMock>(nss);
auto uuid = UUID::gen();
- catalog.registerCollection(opCtx.get(), uuid, std::move(newColl));
+ catalog.registerCollection(opCtx.get(), uuid, std::move(newColl), boost::none);
}
// One dbName with only an invisible collection does not appear in dbNames. Use const_cast to
@@ -772,6 +773,10 @@ public:
opCtx = makeOperationContext();
}
+ std::shared_ptr<const CollectionCatalog> catalog() {
+ return CollectionCatalog::get(opCtx.get());
+ }
+
void createCollection(OperationContext* opCtx,
const NamespaceString& nss,
Timestamp timestamp) {
@@ -832,6 +837,33 @@ public:
wuow.commit();
}
+ void renameCollection(OperationContext* opCtx,
+ const NamespaceString& from,
+ const NamespaceString& to,
+ Timestamp timestamp) {
+ invariant(from.db() == to.db());
+
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ opCtx->recoveryUnit()->abandonSnapshot();
+
+ if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ opCtx->recoveryUnit()->clearCommitTimestamp();
+ }
+ opCtx->recoveryUnit()->setCommitTimestamp(timestamp);
+
+ Lock::DBLock dbLk(opCtx, from.db(), MODE_IX);
+ Lock::CollectionLock fromLk(opCtx, from, MODE_X);
+ Lock::CollectionLock toLk(opCtx, to, MODE_X);
+
+ CollectionWriter collection(opCtx, from);
+
+ WriteUnitOfWork wuow(opCtx);
+ ASSERT_OK(collection.getWritableCollection(opCtx)->rename(opCtx, to, false));
+ CollectionCatalog::get(opCtx)->onCollectionRename(
+ opCtx, collection.getWritableCollection(opCtx), from);
+ wuow.commit();
+ }
+
void createIndex(OperationContext* opCtx,
const NamespaceString& nss,
BSONObj indexSpec,
@@ -1579,5 +1611,365 @@ TEST_F(CollectionCatalogTimestampTest, OpenNewCollectionAndIndexesWithReaper) {
}
}
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCreate) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create collection and extract the catalogId
+ createCollection(opCtx.get(), nss, Timestamp(1, 2));
+ RecordId rid = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId();
+
+ // Lookup without timestamp returns latest catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, boost::none), rid);
+ // Lookup before create returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 1)), boost::none);
+ // Lookup at create returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 2)), rid);
+ // Lookup after create returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 3)), rid);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingDrop) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create and drop collection. We have a time window where the namespace exists
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ RecordId rid = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId();
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+
+ // Lookup without timestamp returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, boost::none), boost::none);
+ // Lookup before create returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 4)), boost::none);
+ // Lookup at create returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), rid);
+ // Lookup after create returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 6)), rid);
+ // Lookup at drop returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 10)), boost::none);
+ // Lookup after drop returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 20)), boost::none);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingRename) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString from("a.b");
+ NamespaceString to("a.c");
+
+ // Create and rename collection. We have two windows where the namespace exists but for
+ // different namespaces
+ createCollection(opCtx.get(), from, Timestamp(1, 5));
+ RecordId rid = catalog()->lookupCollectionByNamespace(opCtx.get(), from)->getCatalogId();
+ renameCollection(opCtx.get(), from, to, Timestamp(1, 10));
+
+ // Lookup without timestamp on 'from' returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, boost::none), boost::none);
+ // Lookup before create returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 4)), boost::none);
+ // Lookup at create returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 5)), rid);
+ // Lookup after create returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 6)), rid);
+ // Lookup at rename on 'from' returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 10)), boost::none);
+ // Lookup after rename on 'from' returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(from, Timestamp(1, 20)), boost::none);
+
+ // Lookup without timestamp on 'to' returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, boost::none), rid);
+ // Lookup before rename on 'to' returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, Timestamp(1, 9)), boost::none);
+ // Lookup at rename on 'to' returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, Timestamp(1, 10)), rid);
+ // Lookup after rename on 'to' returns catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(to, Timestamp(1, 20)), rid);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingDropCreate) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create, drop and recreate collection on the same namespace. We have different catalogId.
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ RecordId rid1 = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId();
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+ createCollection(opCtx.get(), nss, Timestamp(1, 15));
+ RecordId rid2 = catalog()->lookupCollectionByNamespace(opCtx.get(), nss)->getCatalogId();
+
+ // Lookup without timestamp returns latest catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, boost::none), rid2);
+ // Lookup before first create returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 4)), boost::none);
+ // Lookup at first create returns first catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), rid1);
+ // Lookup after first create returns first catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 6)), rid1);
+ // Lookup at drop returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 10)), boost::none);
+ // Lookup after drop returns none
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 13)), boost::none);
+ // Lookup at second create returns second catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), rid2);
+ // Lookup after second create returns second catalogId
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 20)), rid2);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupEqDrop) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create collection and verify we have nothing to cleanup
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+
+ // Drop collection and verify we have nothing to cleanup as long as the oldest timestamp is
+ // before the drop
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5)));
+ ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10)));
+
+ // Create new collection and nothing changed with answers to needsCleanupForOldestTimestamp.
+ createCollection(opCtx.get(), nss, Timestamp(1, 15));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 7)));
+ ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10)));
+
+ // We can lookup the old catalogId before we advance the oldest timestamp and cleanup
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+
+ // Cleanup at drop timestamp
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 10));
+ });
+ // After cleanup, we cannot find the old catalogId anymore. Also verify that we don't need
+ // anymore cleanup
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10)));
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupGtDrop) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create collection and verify we have nothing to cleanup
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+
+ // Drop collection and verify we have nothing to cleanup as long as the oldest timestamp is
+ // before the drop
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5)));
+ ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10)));
+
+ // Create new collection and nothing changed with answers to needsCleanupForOldestTimestamp.
+ createCollection(opCtx.get(), nss, Timestamp(1, 15));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 7)));
+ ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 12)));
+
+ // We can lookup the old catalogId before we advance the oldest timestamp and cleanup
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+
+ // Cleanup after the drop timestamp
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 12));
+ });
+
+ // After cleanup, we cannot find the old catalogId anymore. Also verify that we don't need
+ // anymore cleanup
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 12)));
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupGtRecreate) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create collection and verify we have nothing to cleanup
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+
+ // Drop collection and verify we have nothing to cleanup as long as the oldest timestamp is
+ // before the drop
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5)));
+ ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 10)));
+
+ // Create new collection and nothing changed with answers to needsCleanupForOldestTimestamp.
+ createCollection(opCtx.get(), nss, Timestamp(1, 15));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 1)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 5)));
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 7)));
+ ASSERT_TRUE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 20)));
+
+ // We can lookup the old catalogId before we advance the oldest timestamp and cleanup
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+
+ // Cleanup after the recreate timestamp
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 20));
+ });
+
+ // After cleanup, we cannot find the old catalogId anymore. Also verify that we don't need
+ // anymore cleanup
+ ASSERT_FALSE(catalog()->needsCleanupForOldestTimestamp(Timestamp(1, 20)));
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupMultiple) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create and drop multiple namespace on the same namespace
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+ createCollection(opCtx.get(), nss, Timestamp(1, 15));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 20));
+ createCollection(opCtx.get(), nss, Timestamp(1, 25));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 30));
+ createCollection(opCtx.get(), nss, Timestamp(1, 35));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 40));
+
+ // Lookup can find all four collections
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+
+ // Cleanup oldest
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 10));
+ });
+
+ // Lookup can find the three remaining collections
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+
+ // Cleanup
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 21));
+ });
+
+ // Lookup can find the two remaining collections
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+
+ // Cleanup
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 32));
+ });
+
+ // Lookup can find the last remaining collections
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+
+ // Cleanup
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 50));
+ });
+
+ // Lookup can find no collections
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingCleanupMultipleSingleCall) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString nss("a.b");
+
+ // Create and drop multiple namespace on the same namespace
+ createCollection(opCtx.get(), nss, Timestamp(1, 5));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 10));
+ createCollection(opCtx.get(), nss, Timestamp(1, 15));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 20));
+ createCollection(opCtx.get(), nss, Timestamp(1, 25));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 30));
+ createCollection(opCtx.get(), nss, Timestamp(1, 35));
+ dropCollection(opCtx.get(), nss, Timestamp(1, 40));
+
+ // Lookup can find all four collections
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+
+ // Cleanup all
+ CollectionCatalog::write(opCtx.get(), [&](CollectionCatalog& c) {
+ c.cleanupForOldestTimestampAdvanced(Timestamp(1, 50));
+ });
+
+ // Lookup can find no collections
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 5)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 15)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 25)), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(nss, Timestamp(1, 35)), boost::none);
+}
+
+TEST_F(CollectionCatalogTimestampTest, CatalogIdMappingRollback) {
+ RAIIServerParameterControllerForTest featureFlagController(
+ "featureFlagPointInTimeCatalogLookups", true);
+
+ NamespaceString a("b.a");
+ NamespaceString b("b.b");
+ NamespaceString c("b.c");
+ NamespaceString d("b.d");
+ NamespaceString e("b.e");
+
+ // Create and drop multiple namespace on the same namespace
+ createCollection(opCtx.get(), a, Timestamp(1, 1));
+ dropCollection(opCtx.get(), a, Timestamp(1, 2));
+ createCollection(opCtx.get(), a, Timestamp(1, 3));
+ createCollection(opCtx.get(), b, Timestamp(1, 5));
+ createCollection(opCtx.get(), c, Timestamp(1, 7));
+ createCollection(opCtx.get(), d, Timestamp(1, 8));
+ createCollection(opCtx.get(), e, Timestamp(1, 9));
+ dropCollection(opCtx.get(), b, Timestamp(1, 10));
+
+ // Rollback to Timestamp(1, 8)
+ CollectionCatalog::write(
+ opCtx.get(), [&](CollectionCatalog& c) { c.cleanupForCatalogReopen(Timestamp(1, 8)); });
+
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(a, boost::none), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(b, boost::none), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(c, boost::none), boost::none);
+ ASSERT_NE(catalog()->lookupCatalogIdByNSS(d, boost::none), boost::none);
+ ASSERT_EQ(catalog()->lookupCatalogIdByNSS(e, boost::none), boost::none);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/catalog/collection_writer_test.cpp b/src/mongo/db/catalog/collection_writer_test.cpp
index a5aad697d59..121afe9b89d 100644
--- a/src/mongo/db/catalog/collection_writer_test.cpp
+++ b/src/mongo/db/catalog/collection_writer_test.cpp
@@ -58,7 +58,8 @@ protected:
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(kNss);
CollectionCatalog::write(getServiceContext(), [&](CollectionCatalog& catalog) {
- catalog.registerCollection(operationContext(), UUID::gen(), std::move(collection));
+ catalog.registerCollection(
+ operationContext(), UUID::gen(), std::move(collection), /*ts=*/boost::none);
});
}
@@ -255,7 +256,8 @@ public:
catalog.registerCollection(operationContext(),
UUID::gen(),
std::make_shared<CollectionMock>(
- NamespaceString("many", fmt::format("coll{}", i))));
+ NamespaceString("many", fmt::format("coll{}", i))),
+ /*ts=*/boost::none);
}
});
}
diff --git a/src/mongo/db/catalog/uncommitted_catalog_updates.cpp b/src/mongo/db/catalog/uncommitted_catalog_updates.cpp
index ba987b36d35..69d405626b7 100644
--- a/src/mongo/db/catalog/uncommitted_catalog_updates.cpp
+++ b/src/mongo/db/catalog/uncommitted_catalog_updates.cpp
@@ -117,12 +117,13 @@ void UncommittedCatalogUpdates::_createCollection(OperationContext* opCtx,
// This will throw when registering a namespace which is already in use.
CollectionCatalog::write(opCtx, [&, coll = createdColl](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, uuid, coll);
+ catalog.registerCollection(opCtx, uuid, coll, /*ts=*/boost::none);
});
opCtx->recoveryUnit()->onRollback([opCtx, uuid]() {
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.deregisterCollection(opCtx, uuid, /*isDropPending=*/false);
+ catalog.deregisterCollection(
+ opCtx, uuid, /*isDropPending=*/false, /*ts=*/boost::none);
});
});
});
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 96f0715cb11..6debe506089 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -479,7 +479,8 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndResumeAfter
// Need to put the collection in the collection catalog so the resume token is valid.
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(expCtx->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(expCtx->opCtx, testUuid(), std::move(collection));
+ catalog.registerCollection(
+ expCtx->opCtx, testUuid(), std::move(collection), /*ts=*/boost::none);
});
ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
@@ -502,7 +503,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAfterAndResumeAfterOptions) {
// Need to put the collection in the collection catalog so the resume token is validcollection
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, testUuid(), std::move(collection));
+ catalog.registerCollection(opCtx, testUuid(), std::move(collection), /*ts=*/boost::none);
});
ASSERT_THROWS_CODE(
@@ -530,7 +531,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectBothStartAtOperationTimeAndStartAfterO
// Need to put the collection in the collection catalog so the resume token is valid.
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, testUuid(), std::move(collection));
+ catalog.registerCollection(opCtx, testUuid(), std::move(collection), /*ts=*/boost::none);
});
ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
@@ -553,7 +554,7 @@ TEST_F(ChangeStreamStageTest, ShouldRejectResumeAfterWithResumeTokenMissingUUID)
// Need to put the collection in the collection catalog so the resume token is valid.
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, testUuid(), std::move(collection));
+ catalog.registerCollection(opCtx, testUuid(), std::move(collection), /*ts=*/boost::none);
});
ASSERT_THROWS_CODE(
@@ -2708,7 +2709,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2FieldIn
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
@@ -2751,7 +2753,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldUseO2FieldInOplog) {
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
@@ -2794,7 +2797,8 @@ TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) {
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
// Create a resume token from only the timestamp.
@@ -2855,7 +2859,8 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
// Need to put the collection in the collection catalog so the resume token is valid.
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(expCtx->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, testUuid(), std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, testUuid(), std::move(collection), /*ts=*/boost::none);
});
const auto resumeTokenInvalidate =
@@ -3317,7 +3322,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyWhenNoO2Field
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
@@ -3355,7 +3361,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldUseO2FieldInOplog) {
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
BSONObj docKey = BSON("_id" << 1);
@@ -3393,7 +3400,8 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
// Need to put the collection in the collection catalog so the resume token is valid.
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(expCtx->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, testUuid(), std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, testUuid(), std::move(collection), /*ts=*/boost::none);
});
const auto resumeTokenInvalidate =
@@ -3417,7 +3425,8 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
// Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
@@ -3451,7 +3460,8 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(nss);
CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ catalog.registerCollection(
+ getExpCtx()->opCtx, uuid, std::move(collection), /*ts=*/boost::none);
});
// Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
index 95c9557c7f8..67b4f9d020a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
@@ -150,8 +150,10 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_Man
auto uuid = UUID::gen();
CollectionCatalog::write(getServiceContext(), [&](CollectionCatalog& catalog) {
- catalog.registerCollection(
- operationContext(), uuid, std::make_shared<CollectionMock>(kNamespace));
+ catalog.registerCollection(operationContext(),
+ uuid,
+ std::make_shared<CollectionMock>(kNamespace),
+ /*ts=*/boost::none);
});
auto future = launchAsync([&] {
diff --git a/src/mongo/db/storage/kv/durable_catalog_test.cpp b/src/mongo/db/storage/kv/durable_catalog_test.cpp
index c240f2dd9f3..5f371ee9b57 100644
--- a/src/mongo/db/storage/kv/durable_catalog_test.cpp
+++ b/src/mongo/db/storage/kv/durable_catalog_test.cpp
@@ -111,8 +111,10 @@ public:
getCatalog()->getMetaData(operationContext(), catalogId),
std::move(coll.second));
CollectionCatalog::write(operationContext(), [&](CollectionCatalog& catalog) {
- catalog.registerCollection(
- operationContext(), options.uuid.value(), std::move(collection));
+ catalog.registerCollection(operationContext(),
+ options.uuid.value(),
+ std::move(collection),
+ /*ts=*/boost::none);
});
wuow.commit();
diff --git a/src/mongo/db/storage/kv/storage_engine_test.cpp b/src/mongo/db/storage/kv/storage_engine_test.cpp
index 2a143839964..c3f5ae88e31 100644
--- a/src/mongo/db/storage/kv/storage_engine_test.cpp
+++ b/src/mongo/db/storage/kv/storage_engine_test.cpp
@@ -110,7 +110,8 @@ TEST_F(StorageEngineTest, LoadCatalogDropsOrphansAfterUncleanShutdown) {
{
Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow);
_storageEngine->closeCatalog(opCtx.get());
- _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kUnclean);
+ _storageEngine->loadCatalog(
+ opCtx.get(), boost::none, StorageEngine::LastShutdownState::kUnclean);
}
ASSERT(!identExists(opCtx.get(), swCollInfo.getValue().ident));
@@ -408,7 +409,8 @@ TEST_F(StorageEngineRepairTest, LoadCatalogRecoversOrphans) {
{
Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow);
_storageEngine->closeCatalog(opCtx.get());
- _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kClean);
+ _storageEngine->loadCatalog(
+ opCtx.get(), boost::none, StorageEngine::LastShutdownState::kClean);
}
ASSERT(identExists(opCtx.get(), swCollInfo.getValue().ident));
@@ -464,7 +466,8 @@ TEST_F(StorageEngineRepairTest, LoadCatalogRecoversOrphansInCatalog) {
// When in a repair context, loadCatalog() recreates catalog entries for orphaned idents.
{
Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow);
- _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kClean);
+ _storageEngine->loadCatalog(
+ opCtx.get(), boost::none, StorageEngine::LastShutdownState::kClean);
}
auto identNs = swCollInfo.getValue().ident;
std::replace(identNs.begin(), identNs.end(), '-', '_');
@@ -500,7 +503,8 @@ TEST_F(StorageEngineTest, LoadCatalogDropsOrphans) {
// orphaned idents.
{
Lock::GlobalWrite writeLock(opCtx.get(), Date_t::max(), Lock::InterruptBehavior::kThrow);
- _storageEngine->loadCatalog(opCtx.get(), StorageEngine::LastShutdownState::kClean);
+ _storageEngine->loadCatalog(
+ opCtx.get(), boost::none, StorageEngine::LastShutdownState::kClean);
}
// reconcileCatalogAndIdents() drops orphaned idents.
auto reconcileResult = unittest::assertGet(reconcile(opCtx.get()));
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index a67a04e7aa7..d3d6dbc5ee2 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -226,7 +226,9 @@ public:
* caller. For example, on starting from a previous unclean shutdown, we may try to recover
* orphaned idents, which are known to the storage engine but not referenced in the catalog.
*/
- virtual void loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) = 0;
+ virtual void loadCatalog(OperationContext* opCtx,
+ boost::optional<Timestamp> stableTs,
+ LastShutdownState lastShutdownState) = 0;
virtual void closeCatalog(OperationContext* opCtx) = 0;
/**
diff --git a/src/mongo/db/storage/storage_engine_impl.cpp b/src/mongo/db/storage/storage_engine_impl.cpp
index 2743c014c96..a73de55a3fa 100644
--- a/src/mongo/db/storage/storage_engine_impl.cpp
+++ b/src/mongo/db/storage/storage_engine_impl.cpp
@@ -93,6 +93,19 @@ StorageEngineImpl::StorageEngineImpl(OperationContext* opCtx,
[serviceContext = opCtx->getServiceContext()](Timestamp timestamp) {
HistoricalIdentTracker::get(serviceContext).removeEntriesOlderThan(timestamp);
}),
+ _collectionCatalogCleanupTimestampListener(
+ TimestampMonitor::TimestampType::kOldest,
+ [serviceContext = opCtx->getServiceContext()](Timestamp timestamp) {
+ // The global lock is held by the timestamp monitor while callbacks are executed, so
+ // there can be no batched CollectionCatalog writer and we are thus safe to write
+ // using the service context.
+ if (CollectionCatalog::get(serviceContext)
+ ->needsCleanupForOldestTimestamp(timestamp)) {
+ CollectionCatalog::write(serviceContext, [timestamp](CollectionCatalog& catalog) {
+ catalog.cleanupForOldestTimestampAdvanced(timestamp);
+ });
+ }
+ }),
_supportsCappedCollections(_engine->supportsCappedCollections()) {
uassert(28601,
"Storage engine does not support --directoryperdb",
@@ -114,11 +127,14 @@ StorageEngineImpl::StorageEngineImpl(OperationContext* opCtx,
invariant(!opCtx->lockState()->isLocked() || opCtx->lockState()->isW());
Lock::GlobalWrite globalLk(opCtx);
loadCatalog(opCtx,
+ _engine->getRecoveryTimestamp(),
_options.lockFileCreatedByUncleanShutdown ? LastShutdownState::kUnclean
: LastShutdownState::kClean);
}
-void StorageEngineImpl::loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) {
+void StorageEngineImpl::loadCatalog(OperationContext* opCtx,
+ boost::optional<Timestamp> stableTs,
+ LastShutdownState lastShutdownState) {
bool catalogExists = _engine->hasIdent(opCtx, kCatalogInfo);
if (_options.forRepair && catalogExists) {
auto repairObserver = StorageRepairObserver::get(getGlobalServiceContext());
@@ -344,7 +360,10 @@ void StorageEngineImpl::loadCatalog(OperationContext* opCtx, LastShutdownState l
}
Timestamp minVisibleTs = Timestamp::min();
- Timestamp minValidTs = minVisibleTs;
+ // Use the stable timestamp as minValid. We know for a fact that the collection exist at
+ // this point and is in sync. If we use an earlier timestamp than replication rollback we
+ // may be out-of-order for the collection catalog managing this namespace.
+ Timestamp minValidTs = stableTs ? *stableTs : Timestamp::min();
// If there's no recovery timestamp, every collection is available.
if (boost::optional<Timestamp> recoveryTs = _engine->getRecoveryTimestamp()) {
// Otherwise choose a minimum visible timestamp that's at least as large as the true
@@ -353,10 +372,6 @@ void StorageEngineImpl::loadCatalog(OperationContext* opCtx, LastShutdownState l
// `oldestTimestamp` and conservatively choose the `recoveryTimestamp` for everything
// else.
minVisibleTs = recoveryTs.value();
- // Minimum valid timestamp is always set to the recovery timestamp. Even if the
- // collection exists at the oldest timestamp we do not know if it would be in sync with
- // the durable catalog due to collMod or index changes.
- minValidTs = minVisibleTs;
if (existedAtOldestTs.find(entry.catalogId) != existedAtOldestTs.end()) {
// Collections found at the `oldestTimestamp` on startup can have their minimum
// visible timestamp pulled back to that value.
@@ -410,10 +425,10 @@ void StorageEngineImpl::_initCollection(OperationContext* opCtx,
auto collectionFactory = Collection::Factory::get(getGlobalServiceContext());
auto collection = collectionFactory->make(opCtx, nss, catalogId, md, std::move(rs));
collection->setMinimumVisibleSnapshot(minVisibleTs);
- collection->setMinimumValidSnapshot(minValidTs);
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, md->options.uuid.value(), std::move(collection));
+ catalog.registerCollection(
+ opCtx, md->options.uuid.value(), std::move(collection), /*commitTime*/ minValidTs);
});
}
@@ -840,6 +855,7 @@ void StorageEngineImpl::startTimestampMonitor() {
_timestampMonitor->addListener(&_minOfCheckpointAndOldestTimestampListener);
_timestampMonitor->addListener(&_historicalIdentTimestampListener);
+ _timestampMonitor->addListener(&_collectionCatalogCleanupTimestampListener);
}
void StorageEngineImpl::notifyStartupComplete() {
@@ -1019,7 +1035,8 @@ Status StorageEngineImpl::repairRecordStore(OperationContext* opCtx,
// After repairing, re-initialize the collection with a valid RecordStore.
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
auto uuid = catalog.lookupUUIDByNSS(opCtx, nss).value();
- catalog.deregisterCollection(opCtx, uuid, /*isDropPending=*/false);
+ catalog.deregisterCollection(
+ opCtx, uuid, /*isDropPending=*/false, /*commitTime*/ boost::none);
});
// When repairing a record store, keep the existing behavior of not installing a minimum visible
diff --git a/src/mongo/db/storage/storage_engine_impl.h b/src/mongo/db/storage/storage_engine_impl.h
index f95b86f32a7..6a431abe7df 100644
--- a/src/mongo/db/storage/storage_engine_impl.h
+++ b/src/mongo/db/storage/storage_engine_impl.h
@@ -350,7 +350,9 @@ public:
/**
* When loading after an unclean shutdown, this performs cleanup on the DurableCatalogImpl.
*/
- void loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) final;
+ void loadCatalog(OperationContext* opCtx,
+ boost::optional<Timestamp> stableTs,
+ LastShutdownState lastShutdownState) final;
void closeCatalog(OperationContext* opCtx) final;
@@ -454,6 +456,9 @@ private:
// checkpoint timestamp.
TimestampMonitor::TimestampListener _historicalIdentTimestampListener;
+ // Listener for cleanup of CollectionCatalog when oldest timestamp advances.
+ TimestampMonitor::TimestampListener _collectionCatalogCleanupTimestampListener;
+
const bool _supportsCappedCollections;
std::unique_ptr<RecordStore> _catalogRecordStore;
diff --git a/src/mongo/db/storage/storage_engine_mock.h b/src/mongo/db/storage/storage_engine_mock.h
index 66665f9edec..37542e2ac8d 100644
--- a/src/mongo/db/storage/storage_engine_mock.h
+++ b/src/mongo/db/storage/storage_engine_mock.h
@@ -53,7 +53,9 @@ public:
bool isEphemeral() const final {
return true;
}
- void loadCatalog(OperationContext* opCtx, LastShutdownState lastShutdownState) final {}
+ void loadCatalog(OperationContext* opCtx,
+ boost::optional<Timestamp> stableTs,
+ LastShutdownState lastShutdownState) final {}
void closeCatalog(OperationContext* opCtx) final {}
Status closeDatabase(OperationContext* opCtx, const DatabaseName& dbName) final {
return Status::OK();
diff --git a/src/mongo/db/storage/storage_engine_test_fixture.h b/src/mongo/db/storage/storage_engine_test_fixture.h
index 42e748bc588..364d9467b0a 100644
--- a/src/mongo/db/storage/storage_engine_test_fixture.h
+++ b/src/mongo/db/storage/storage_engine_test_fixture.h
@@ -76,7 +76,8 @@ public:
_storageEngine->getCatalog()->getMetaData(opCtx, catalogId),
std::move(rs));
CollectionCatalog::write(opCtx, [&](CollectionCatalog& catalog) {
- catalog.registerCollection(opCtx, options.uuid.get(), std::move(coll));
+ catalog.registerCollection(
+ opCtx, options.uuid.get(), std::move(coll), /*ts=*/boost::none);
});
return {{_storageEngine->getCatalog()->getEntry(catalogId)}};