summaryrefslogtreecommitdiff
path: root/src/mongo/db/catalog/collection_catalog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/catalog/collection_catalog.cpp')
-rw-r--r--src/mongo/db/catalog/collection_catalog.cpp280
1 files changed, 258 insertions, 22 deletions
diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp
index 26c630537a8..3a7e255f464 100644
--- a/src/mongo/db/catalog/collection_catalog.cpp
+++ b/src/mongo/db/catalog/collection_catalog.cpp
@@ -154,22 +154,29 @@ public:
break;
}
case UncommittedCatalogUpdates::Entry::Action::kRenamedCollection: {
- writeJobs.push_back([opCtx, &from = entry.nss, &to = entry.renameTo](
- CollectionCatalog& catalog) {
+ writeJobs.push_back([opCtx,
+ &from = entry.nss,
+ &to = entry.renameTo,
+ commitTime](CollectionCatalog& catalog) {
+ // We just need to do modifications on 'from' here. 'to' is taken care
+ // of by a separate kWritableCollection entry.
catalog._collections.erase(from);
auto& resourceCatalog = ResourceCatalog::get(opCtx->getServiceContext());
resourceCatalog.remove({RESOURCE_COLLECTION, from}, from);
resourceCatalog.add({RESOURCE_COLLECTION, to}, to);
+
+ catalog._pushCatalogIdForRename(from, to, commitTime);
});
break;
}
case UncommittedCatalogUpdates::Entry::Action::kDroppedCollection: {
- writeJobs.push_back(
- [opCtx, uuid = *entry.uuid(), isDropPending = *entry.isDropPending](
- CollectionCatalog& catalog) {
- catalog.deregisterCollection(opCtx, uuid, isDropPending);
- });
+ writeJobs.push_back([opCtx,
+ uuid = *entry.uuid(),
+ isDropPending = *entry.isDropPending,
+ commitTime](CollectionCatalog& catalog) {
+ catalog.deregisterCollection(opCtx, uuid, isDropPending, commitTime);
+ });
break;
}
case UncommittedCatalogUpdates::Entry::Action::kRecreatedCollection: {
@@ -177,17 +184,12 @@ public:
collection = entry.collection,
uuid = *entry.externalUUID,
commitTime](CollectionCatalog& catalog) {
- if (commitTime) {
- collection->setMinimumValidSnapshot(commitTime.value());
- }
- catalog.registerCollection(opCtx, uuid, std::move(collection));
+ catalog.registerCollection(opCtx, uuid, std::move(collection), commitTime);
});
// Fallthrough to the createCollection case to finish committing the collection.
[[fallthrough]];
}
case UncommittedCatalogUpdates::Entry::Action::kCreatedCollection: {
- auto collPtr = entry.collection.get();
-
// By this point, we may or may not have reserved an oplog slot for the
// collection creation.
// For example, multi-document transactions will only reserve the oplog slot at
@@ -197,11 +199,15 @@ public:
// we must update the minVisibleTimestamp with the appropriate value. This is
// fine because the collection should not be visible in the catalog until we
// call setCommitted(true).
- if (commitTime) {
- collPtr->setMinimumVisibleSnapshot(commitTime.value());
- collPtr->setMinimumValidSnapshot(commitTime.value());
- }
- collPtr->setCommitted(true);
+ writeJobs.push_back([coll = entry.collection.get(),
+ commitTime](CollectionCatalog& catalog) {
+ if (commitTime) {
+ coll->setMinimumVisibleSnapshot(commitTime.value());
+ coll->setMinimumValidSnapshot(commitTime.value());
+ }
+ catalog._pushCatalogIdForNSS(coll->ns(), coll->getCatalogId(), commitTime);
+ coll->setCommitted(true);
+ });
break;
}
case UncommittedCatalogUpdates::Entry::Action::kReplacedViewsForDatabase: {
@@ -1122,6 +1128,25 @@ boost::optional<UUID> CollectionCatalog::lookupUUIDByNSS(OperationContext* opCtx
return boost::none;
}
+boost::optional<RecordId> CollectionCatalog::lookupCatalogIdByNSS(
+ const NamespaceString& nss, boost::optional<Timestamp> ts) const {
+ if (auto it = _catalogIds.find(nss); it != _catalogIds.end()) {
+ const auto& range = it->second;
+ if (!ts) {
+ return range.back().id;
+ }
+
+ auto rangeIt = std::upper_bound(
+ range.begin(), range.end(), *ts, [](const auto& ts, const auto& entry) {
+ return ts < entry.ts;
+ });
+ if (rangeIt != range.begin()) {
+ return (--rangeIt)->id;
+ }
+ }
+ return boost::none;
+}
+
void CollectionCatalog::iterateViews(OperationContext* opCtx,
const DatabaseName& dbName,
ViewIteratorCallback callback,
@@ -1305,7 +1330,8 @@ CollectionCatalog::ViewCatalogSet CollectionCatalog::getViewCatalogDbNames(
void CollectionCatalog::registerCollection(OperationContext* opCtx,
const UUID& uuid,
- std::shared_ptr<Collection> coll) {
+ std::shared_ptr<Collection> coll,
+ boost::optional<Timestamp> commitTime) {
auto nss = coll->ns();
_ensureNamespaceDoesNotExist(opCtx, nss, NamespaceType::kAll);
@@ -1326,6 +1352,12 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx,
_collections[nss] = coll;
_orderedCollections[dbIdPair] = coll;
+ if (commitTime && !commitTime->isNull()) {
+ coll->setMinimumValidSnapshot(commitTime.value());
+ _pushCatalogIdForNSS(nss, coll->getCatalogId(), commitTime);
+ }
+
+
if (!nss.isOnInternalDb() && !nss.isSystem()) {
_stats.userCollections += 1;
if (coll->isCapped()) {
@@ -1345,9 +1377,11 @@ void CollectionCatalog::registerCollection(OperationContext* opCtx,
resourceCatalog.add({RESOURCE_COLLECTION, nss}, nss);
}
-std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationContext* opCtx,
- const UUID& uuid,
- bool isDropPending) {
+std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(
+ OperationContext* opCtx,
+ const UUID& uuid,
+ bool isDropPending,
+ boost::optional<Timestamp> commitTime) {
invariant(_catalog.find(uuid) != _catalog.end());
auto coll = std::move(_catalog[uuid]);
@@ -1374,6 +1408,11 @@ std::shared_ptr<Collection> CollectionCatalog::deregisterCollection(OperationCon
_collections.erase(ns);
_catalog.erase(uuid);
+ // Push drop unless this is a rollback of a create
+ if (coll->isCommitted()) {
+ _pushCatalogIdForNSS(ns, boost::none, commitTime);
+ }
+
if (!ns.isOnInternalDb() && !ns.isSystem()) {
_stats.userCollections -= 1;
if (coll->isCapped()) {
@@ -1446,6 +1485,110 @@ void CollectionCatalog::_ensureNamespaceDoesNotExist(OperationContext* opCtx,
}
}
+void CollectionCatalog::_pushCatalogIdForNSS(const NamespaceString& nss,
+ boost::optional<RecordId> catalogId,
+ boost::optional<Timestamp> ts) {
+ // TODO SERVER-68674: Remove feature flag check.
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ // No-op.
+ return;
+ }
+
+ auto& ids = _catalogIds[nss];
+
+ if (!ts) {
+ // Make sure untimestamped writes have a single entry in mapping. If we're mixing
+ // timestamped with untimestamped (such as repair). Ignore the untimestamped writes as an
+ // untimestamped deregister will correspond with an untimestamped register. We should leave
+ // the mapping as-is in this case.
+ if (ids.empty() && catalogId) {
+ // This namespace was added due to an untimestamped write, add an entry with min
+ // timestamp
+ ids.push_back(TimestampedCatalogId{catalogId, Timestamp::min()});
+ } else if (ids.size() == 1 && !catalogId) {
+ // This namespace was removed due to an untimestamped write, clear entries.
+ ids.clear();
+ }
+ return;
+ }
+
+ // Re-write latest entry if timestamp match (multiple changes occured in this transaction)
+ if (!ids.empty() && ids.back().ts == *ts) {
+ ids.back().id = catalogId;
+ return;
+ }
+
+ // Otherwise, push new entry at the end. Timestamp is always increasing
+ invariant(ids.empty() || ids.back().ts < *ts);
+ // If the catalogId is the same as last entry, there's nothing we need to do. This can happen
+ // when the catalog is reopened.
+ if (!ids.empty() && ids.back().id == catalogId) {
+ return;
+ }
+
+ ids.push_back(TimestampedCatalogId{catalogId, *ts});
+ _markNamespaceForCatalogIdCleanupIfNeeded(nss, ids);
+}
+
+void CollectionCatalog::_pushCatalogIdForRename(const NamespaceString& from,
+ const NamespaceString& to,
+ boost::optional<Timestamp> ts) {
+ // TODO SERVER-68674: Remove feature flag check.
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ // No-op.
+ return;
+ }
+
+ if (!ts)
+ return;
+
+ // Get 'toIds' first, it may need to instantiate in the container which invalidates all
+ // references.
+ auto& toIds = _catalogIds[to];
+ auto& fromIds = _catalogIds.at(from);
+ invariant(!fromIds.empty());
+
+ // Re-write latest entry if timestamp match (multiple changes occured in this transaction),
+ // otherwise push at end
+ if (!toIds.empty() && toIds.back().ts == *ts) {
+ toIds.back().id = fromIds.back().id;
+ } else {
+ invariant(toIds.empty() || toIds.back().ts < *ts);
+ toIds.push_back(TimestampedCatalogId{fromIds.back().id, *ts});
+ _markNamespaceForCatalogIdCleanupIfNeeded(to, toIds);
+ }
+
+ // Re-write latest entry if timestamp match (multiple changes occured in this transaction),
+ // otherwise push at end
+ if (!fromIds.empty() && fromIds.back().ts == *ts) {
+ fromIds.back().id = boost::none;
+ } else {
+ invariant(fromIds.empty() || fromIds.back().ts < *ts);
+ fromIds.push_back(TimestampedCatalogId{boost::none, *ts});
+ _markNamespaceForCatalogIdCleanupIfNeeded(from, fromIds);
+ }
+}
+
+void CollectionCatalog::_markNamespaceForCatalogIdCleanupIfNeeded(
+ const NamespaceString& nss, const std::vector<TimestampedCatalogId>& ids) {
+
+ auto markForCleanup = [this, &nss](Timestamp ts) {
+ _catalogIdChanges.insert(nss);
+ if (ts < _lowestCatalogIdTimestampForCleanup) {
+ _lowestCatalogIdTimestampForCleanup = ts;
+ }
+ };
+
+ // Cleanup may occur if we have more than one entry for the namespace or if the only entry is a
+ // drop. Use the first entry as lowest cleanup time if we have a drop and the second otherwise
+ // (as the first is needed until that time is reached)
+ if (!ids.empty() && ids.front().id == boost::none) {
+ markForCleanup(ids.front().ts);
+ } else if (ids.size() > 1) {
+ markForCleanup(ids.at(1).ts);
+ }
+}
+
void CollectionCatalog::deregisterAllCollectionsAndViews(ServiceContext* svcCtx) {
LOGV2(20282, "Deregistering all the collections");
for (auto& entry : _catalog) {
@@ -1538,6 +1681,99 @@ CollectionCatalog::iterator CollectionCatalog::end(OperationContext* opCtx) cons
return iterator(opCtx, _orderedCollections.end(), *this);
}
+bool CollectionCatalog::needsCleanupForOldestTimestamp(Timestamp oldest) const {
+ // TODO SERVER-68674: Remove feature flag check.
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ // No-op.
+ return false;
+ }
+
+ return _lowestCatalogIdTimestampForCleanup <= oldest;
+}
+
+void CollectionCatalog::cleanupForOldestTimestampAdvanced(Timestamp oldest) {
+ Timestamp nextLowestCleanupTimestamp = Timestamp::max();
+ // Helper to calculate the smallest entry that needs to be kept and its timestamp
+ auto assignLowestCleanupTimestamp = [&nextLowestCleanupTimestamp](const auto& range) {
+ auto it = range.begin();
+ // Drops can be cleaned up right away, otherwise the second entry is cleanup time.
+ if (it->id.has_value()) {
+ ++it;
+ }
+ nextLowestCleanupTimestamp = std::min(nextLowestCleanupTimestamp, it->ts);
+ };
+
+ // Iterate over all namespaces that is marked that they need cleanup
+ for (auto it = _catalogIdChanges.begin(), end = _catalogIdChanges.end(); it != end;) {
+ auto& range = _catalogIds[*it];
+
+ // Binary search for next larger timestamp
+ auto rangeIt = std::upper_bound(
+ range.begin(), range.end(), oldest, [](const auto& ts, const auto& entry) {
+ return ts < entry.ts;
+ });
+
+ // Continue if there is nothing to cleanup for this timestamp yet
+ if (rangeIt == range.begin()) {
+ assignLowestCleanupTimestamp(range);
+ ++it;
+ continue;
+ }
+
+ // The iterator is positioned to the closest entry that has a larger timestamp, decrement to
+ // get a lower or equal timestamp
+ --rangeIt;
+
+ // If we are positioned on a drop it can be removed
+ if (!rangeIt->id.has_value()) {
+ ++rangeIt;
+ }
+
+ // Erase range
+ range.erase(range.begin(), rangeIt);
+
+ // If the range is now empty or we need to keep the last item we can unmark this namespace
+ // for needing changes.
+ if (range.size() <= 1) {
+ _catalogIdChanges.erase(it++);
+ continue;
+ }
+
+ // More changes are needed for this namespace, keep it in the set and keep track of lowest
+ // timestamp.
+ assignLowestCleanupTimestamp(range);
+ ++it;
+ }
+
+ _lowestCatalogIdTimestampForCleanup = nextLowestCleanupTimestamp;
+}
+
+void CollectionCatalog::cleanupForCatalogReopen(Timestamp stable) {
+ _catalogIdChanges.clear();
+ _lowestCatalogIdTimestampForCleanup = Timestamp::max();
+
+ for (auto it = _catalogIds.begin(); it != _catalogIds.end();) {
+ auto& ids = it->second;
+
+ // Remove all larger timestamps in this range
+ ids.erase(std::upper_bound(ids.begin(),
+ ids.end(),
+ stable,
+ [](Timestamp ts, const auto& entry) { return ts < entry.ts; }),
+ ids.end());
+
+ // Remove namespace if there are no entries left
+ if (ids.empty()) {
+ _catalogIds.erase(it++);
+ continue;
+ }
+
+ // Calculate when this namespace needs to be cleaned up next
+ _markNamespaceForCatalogIdCleanupIfNeeded(it->first, ids);
+ ++it;
+ }
+}
+
void CollectionCatalog::invariantHasExclusiveAccessToCollection(OperationContext* opCtx,
const NamespaceString& nss) {
auto& uncommittedCatalogUpdates = UncommittedCatalogUpdates::get(opCtx);