summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog_cache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/catalog_cache.cpp')
-rw-r--r--src/mongo/s/catalog_cache.cpp762
1 files changed, 282 insertions, 480 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 19846e62b48..d9c2500f2d3 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -55,6 +55,7 @@
#include "mongo/util/timer.h"
namespace mongo {
+
const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh =
OperationContext::declareDecoration<bool>();
@@ -68,81 +69,8 @@ namespace {
const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
const int kDatabaseCacheSize = 10000;
-/**
- * Returns whether two shard versions have a matching epoch.
- */
-bool shardVersionsHaveMatchingEpoch(boost::optional<ChunkVersion> wanted,
- const ChunkVersion& received) {
- return wanted && wanted->epoch() == received.epoch();
-};
-
-/**
- * Given an (optional) initial routing table and a set of changed chunks returned by the catalog
- * cache loader, produces a new routing table with the changes applied.
- *
- * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that
- * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace
- * so that the routing table can be built from scratch.
- *
- * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not
- * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same
- * as that of the collection). Since this situation may be transient, due to the collection being
- * dropped or having its shard key refined concurrently, the caller must retry the reload up to some
- * configurable number of attempts.
- */
-std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
- OperationContext* opCtx,
- const NamespaceString& nss,
- std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) {
- if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- return nullptr;
- }
- const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks));
-
- auto chunkManager = [&] {
- // If we have routing info already and it's for the same collection epoch, we're updating.
- // Otherwise, we're making a whole new routing table.
- if (existingRoutingInfo &&
- existingRoutingInfo->getVersion().epoch() == collectionAndChunks.epoch) {
- if (collectionAndChunks.changedChunks.size() == 1 &&
- collectionAndChunks.changedChunks[0].getVersion() ==
- existingRoutingInfo->getVersion())
- return existingRoutingInfo;
-
- return std::make_shared<RoutingTableHistory>(
- existingRoutingInfo->makeUpdated(std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }
-
- auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
- if (!collectionAndChunks.defaultCollation.isEmpty()) {
- // The collation should have been validated upon collection creation
- return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collectionAndChunks.defaultCollation));
- }
- return nullptr;
- }();
-
- return std::make_shared<RoutingTableHistory>(
- RoutingTableHistory::makeNew(nss,
- collectionAndChunks.uuid,
- KeyPattern(collectionAndChunks.shardKeyPattern),
- std::move(defaultCollator),
- collectionAndChunks.shardKeyIsUnique,
- collectionAndChunks.epoch,
- std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }();
-
- std::set<ShardId> shardIds;
- chunkManager->getAllShardIds(&shardIds);
- for (const auto& shardId : shardIds) {
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- }
- return chunkManager;
-}
+const int kCollectionCacheSize = 10000;
} // namespace
@@ -155,7 +83,8 @@ CatalogCache::CatalogCache(ServiceContext* const service, CatalogCacheLoader& ca
options.maxThreads = 6;
return options;
}())),
- _databaseCache(service, *_executor, _cacheLoader) {
+ _databaseCache(service, *_executor, _cacheLoader),
+ _collectionCache(service, *_executor, _cacheLoader) {
_executor->startup();
}
@@ -190,111 +119,89 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
}
}
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoWithForcedRefresh(
- OperationContext* opCtx, const NamespaceString& nss) {
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- _createOrGetCollectionEntryAndMarkAsNeedsRefresh(nss);
- return _getCollectionRoutingInfo(opCtx, nss);
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
-}
-
-
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
- const NamespaceString& nss,
- Timestamp atClusterTime) {
- return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt(
+StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) {
- invariant(!opCtx->lockState() || !opCtx->lockState()->isLocked(),
- "Do not hold a lock while refreshing the catalog cache. Doing so would potentially "
- "hold the lock during a network call, and can lead to a deadlock as described in "
- "SERVER-37398.");
- // This default value can cause a single unnecessary extra refresh if this thread did do the
- // refresh but the refresh failed, or if the database or collection was not found, but only if
- // the caller is getCollectionRoutingInfoWithRefresh with the parameter
- // forceRefreshFromThisThread set to true
- RefreshAction refreshActionTaken(RefreshAction::kDidNotPerformRefresh);
- while (true) {
+ invariant(
+ !opCtx->lockState() || !opCtx->lockState()->isLocked(),
+ "Do not hold a lock while refreshing the catalog cache. Doing so would potentially hold "
+ "the lock during a network call, and can lead to a deadlock as described in SERVER-37398.");
+
+ try {
const auto swDbInfo = getDatabase(opCtx, nss.db());
+
if (!swDbInfo.isOK()) {
if (swDbInfo == ErrorCodes::NamespaceNotFound) {
LOGV2_FOR_CATALOG_REFRESH(
- 4947102,
+ 4947103,
2,
"Invalidating cached collection entry because its database has been dropped",
"namespace"_attr = nss);
- purgeCollection(nss);
+ invalidateCollectionEntry_LINEARIZABLE(nss);
}
- return {swDbInfo.getStatus(), refreshActionTaken};
+ return swDbInfo.getStatus();
}
const auto dbInfo = std::move(swDbInfo.getValue());
- stdx::unique_lock<Latch> ul(_mutex);
-
- auto collEntry = _createOrGetCollectionEntry(ul, nss);
+ const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh &&
+ !operationShouldBlockBehindCatalogCacheRefresh(opCtx)
+ ? CacheCausalConsistency::kLatestCached
+ : CacheCausalConsistency::kLatestKnown;
- if (collEntry->needsRefresh &&
- (!gEnableFinerGrainedCatalogCacheRefresh || collEntry->epochHasChanged ||
- operationShouldBlockBehindCatalogCacheRefresh(opCtx))) {
+ auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
- operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
+ // If the entry is in the cache return inmediately.
+ if (collEntryFuture.isReady()) {
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ collEntryFuture.get(opCtx),
+ atClusterTime);
+ }
- auto refreshNotification = collEntry->refreshCompletionNotification;
- if (!refreshNotification) {
- refreshNotification = (collEntry->refreshCompletionNotification =
- std::make_shared<Notification<Status>>());
- _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1);
- refreshActionTaken = RefreshAction::kPerformedRefresh;
- }
+ operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
- // Wait on the notification outside of the mutex
- ul.unlock();
+ size_t acquireTries = 0;
+ Timer t;
- auto refreshStatus = [&]() {
- Timer t;
- ON_BLOCK_EXIT([&] { _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); });
+ while (true) {
+ try {
+ auto collEntry = collEntryFuture.get(opCtx);
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- try {
- const Milliseconds kReportingInterval{250};
- while (!refreshNotification->waitFor(opCtx, kReportingInterval)) {
- _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- t.reset();
- }
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
- return refreshNotification->get(opCtx);
- } catch (const DBException& ex) {
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ std::move(collEntry),
+ atClusterTime);
+ } catch (ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) {
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
+ acquireTries++;
+ if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) {
return ex.toStatus();
}
- }();
-
- if (!refreshStatus.isOK()) {
- return {refreshStatus, refreshActionTaken};
}
- // Once the refresh is complete, loop around to get the latest value
- continue;
+ collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
+ t.reset();
}
-
- return {ChunkManager(dbInfo.primaryId(),
- dbInfo.databaseVersion(),
- collEntry->routingInfo,
- atClusterTime),
- refreshActionTaken};
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
}
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
+}
+
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Timestamp atClusterTime) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime);
+}
+
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx,
StringData dbName) {
// TODO SERVER-49724: Make ReadThroughCache support StringData keys
@@ -303,32 +210,20 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon
}
StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh(
- OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) {
- auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- // We want to ensure that we don't join an in-progress refresh because that
- // could violate causal consistency for this client. We don't need to actually perform the
- // refresh ourselves but we do need the refresh to begin *after* this function is
- // called, so calling it twice is enough regardless of what happens the
- // second time. See SERVER-33954 for reasoning.
- if (forceRefreshFromThisThread &&
- refreshResult.actionTaken == RefreshAction::kDidNotPerformRefresh) {
- refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- }
- return refreshResult.statusWithInfo;
+ OperationContext* opCtx, const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
+ return getCollectionRoutingInfo(opCtx, nss);
}
StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss) {
- auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo;
- if (!swRoutingInfo.isOK())
- return swRoutingInfo;
-
- auto cri(std::move(swRoutingInfo.getValue()));
- if (!cri.isSharded())
+ auto routingInfoStatus = getCollectionRoutingInfoWithRefresh(opCtx, nss);
+ if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().isSharded()) {
return {ErrorCodes::NamespaceNotSharded,
str::stream() << "Collection " << nss.ns() << " is not sharded."};
-
- return cri;
+ }
+ return routingInfoStatus;
}
void CatalogCache::onStaleDatabaseVersion(const StringData dbName,
@@ -350,48 +245,49 @@ void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationCon
if (gEnableFinerGrainedCatalogCacheRefresh) {
operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock;
}
-};
+}
void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection(
- OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> wantedVersion,
- const ChunkVersion& receivedVersion,
- ShardId shardId) {
- if (shardVersionsHaveMatchingEpoch(wantedVersion, receivedVersion)) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, shardId);
- } else {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
+ const boost::optional<ChunkVersion>& wantedVersion,
+ const ShardId& shardId) {
+ _stats.countStaleConfigErrors.addAndFetch(1);
+
+ auto collectionEntry = _collectionCache.peekLatestCached(nss);
+ if (collectionEntry && collectionEntry->optRt) {
+ collectionEntry->optRt->setShardStale(shardId);
}
-};
-void CatalogCache::onEpochChange(const NamespaceString& nss) {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
-};
+ if (wantedVersion) {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersion(*wantedVersion));
+ } else {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh());
+ }
+}
void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
- ChunkVersion targetCollectionVersion,
- const ShardId& shardId) const {
- stdx::lock_guard<Latch> lg(_mutex);
- const auto itDb = _collectionsByDb.find(nss.db());
+ const ChunkVersion& targetCollectionVersion,
+ const ShardId& shardId) {
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for database " << nss.db(),
- itDb != _collectionsByDb.end());
+ _databaseCache.peekLatestCached(nss.db().toString()));
- auto itColl = itDb->second.find(nss.ns());
+ auto collectionValueHandle = _collectionCache.peekLatestCached(nss);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for collection.",
- itColl != itDb->second.end());
+ collectionValueHandle);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString()
<< ", but found the collection was unsharded",
- itColl->second->routingInfo);
+ collectionValueHandle->optRt);
- auto foundVersion = itColl->second->routingInfo->getVersion();
+ auto foundVersion = collectionValueHandle->optRt->getVersion();
uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString() << ", but found "
@@ -399,11 +295,6 @@ void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
foundVersion.epoch() == targetCollectionVersion.epoch());
}
-void CatalogCache::invalidateShardForShardedCollection(const NamespaceString& nss,
- const ShardId& staleShardId) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, staleShardId);
-}
-
void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
LOGV2_DEBUG(4997600,
1,
@@ -413,32 +304,24 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
_databaseCache.invalidateCachedValueIf(
[&](const DatabaseType& dbt) { return dbt.getPrimary() == shardId; });
- stdx::lock_guard<Latch> lg(_mutex);
-
// Invalidate collections which contain data on this shard.
- for (const auto& [db, collInfoMap] : _collectionsByDb) {
- for (const auto& [collNs, collRoutingInfoEntry] : collInfoMap) {
- if (!collRoutingInfoEntry->needsRefresh && collRoutingInfoEntry->routingInfo) {
- // The set of shards on which this collection contains chunks.
- std::set<ShardId> shardsOwningDataForCollection;
- collRoutingInfoEntry->routingInfo->getAllShardIds(&shardsOwningDataForCollection);
-
- if (shardsOwningDataForCollection.find(shardId) !=
- shardsOwningDataForCollection.end()) {
- LOGV2_DEBUG(22647,
- 3,
- "Invalidating cached collection {namespace} that has data "
- "on shard {shardId}",
- "Invalidating cached collection",
- "namespace"_attr = collNs,
- "shardId"_attr = shardId);
-
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->routingInfo->setShardStale(shardId);
- }
- }
- }
- }
+ _collectionCache.invalidateCachedValueIf([&](const OptionalRoutingTableHistory& ort) {
+ if (!ort.optRt)
+ return false;
+ const auto& rt = *ort.optRt;
+
+ std::set<ShardId> shardIds;
+ rt.getAllShardIds(&shardIds);
+
+ LOGV2_DEBUG(22647,
+ 3,
+ "Invalidating cached collection {namespace} that has data "
+ "on shard {shardId}",
+ "Invalidating cached collection",
+ "namespace"_attr = rt.nss(),
+ "shardId"_attr = shardId);
+ return shardIds.find(shardId) != shardIds.end();
+ });
LOGV2(22648,
"Finished invalidating databases and collections with data on shard: {shardId}",
@@ -446,46 +329,28 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
"shardId"_attr = shardId);
}
-void CatalogCache::purgeCollection(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- auto itDb = _collectionsByDb.find(nss.db());
- if (itDb == _collectionsByDb.end()) {
- return;
- }
-
- itDb->second.erase(nss.ns());
-}
-
void CatalogCache::purgeDatabase(StringData dbName) {
_databaseCache.invalidate(dbName.toString());
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.erase(dbName);
+ _collectionCache.invalidateKeyIf(
+ [&](const NamespaceString& nss) { return nss.db() == dbName; });
}
void CatalogCache::purgeAllDatabases() {
_databaseCache.invalidateAll();
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.clear();
+ _collectionCache.invalidateAll();
}
void CatalogCache::report(BSONObjBuilder* builder) const {
BSONObjBuilder cacheStatsBuilder(builder->subobjStart("catalogCache"));
- size_t numDatabaseEntries;
- size_t numCollectionEntries{0};
- {
- numDatabaseEntries = _databaseCache.getCacheInfo().size();
- stdx::lock_guard<Latch> ul(_mutex);
- for (const auto& entry : _collectionsByDb) {
- numCollectionEntries += entry.second.size();
- }
- }
+ const size_t numDatabaseEntries = _databaseCache.getCacheInfo().size();
+ const size_t numCollectionEntries = _collectionCache.getCacheInfo().size();
cacheStatsBuilder.append("numDatabaseEntries", static_cast<long long>(numDatabaseEntries));
cacheStatsBuilder.append("numCollectionEntries", static_cast<long long>(numCollectionEntries));
_stats.report(&cacheStatsBuilder);
+ _collectionCache.reportStats(&cacheStatsBuilder);
}
void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx,
@@ -519,188 +384,8 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC
}
}
-void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
- ServiceContext* service,
- std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
- NamespaceString const& nss,
- int refreshAttempt) {
- const auto existingRoutingInfo = collEntry->routingInfo;
-
- // If we have an existing chunk manager, the refresh is considered "incremental", regardless of
- // how many chunks are in the differential
- const bool isIncremental(existingRoutingInfo);
-
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.addAndFetch(1);
- _stats.countIncrementalRefreshesStarted.addAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.addAndFetch(1);
- _stats.countFullRefreshesStarted.addAndFetch(1);
- }
-
- // Invoked when one iteration of getChunksSince has completed, whether with success or error
- const auto onRefreshCompleted = [this, t = Timer(), nss, isIncremental, existingRoutingInfo](
- const Status& status,
- RoutingTableHistory* routingInfoAfterRefresh) {
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.subtractAndFetch(1);
- }
-
- if (!status.isOK()) {
- _stats.countFailedRefreshes.addAndFetch(1);
-
- LOGV2_OPTIONS(24103,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Error refreshing cached collection {namespace}; Took {duration} and "
- "failed due to {error}",
- "Error refreshing cached collection",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()),
- "error"_attr = redact(status));
- } else if (routingInfoAfterRefresh) {
- const int logLevel =
- (!existingRoutingInfo ||
- (existingRoutingInfo &&
- routingInfoAfterRefresh->getVersion() != existingRoutingInfo->getVersion()))
- ? 0
- : 1;
- LOGV2_FOR_CATALOG_REFRESH(
- 24104,
- logLevel,
- "Refreshed cached collection {namespace} to version {newVersion} from version "
- "{oldVersion}. Took {duration}",
- "Refreshed cached collection",
- "namespace"_attr = nss,
- "newVersion"_attr = routingInfoAfterRefresh->getVersion(),
- "oldVersion"_attr =
- (existingRoutingInfo
- ? (" from version " + existingRoutingInfo->getVersion().toString())
- : ""),
- "duration"_attr = Milliseconds(t.millis()));
- } else {
- LOGV2_OPTIONS(24105,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Collection {namespace} was found to be unsharded after refresh that "
- "took {duration}",
- "Collection has found to be unsharded after refresh",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()));
- }
- };
-
- // Invoked if getChunksSince resulted in error or threw an exception
- const auto onRefreshFailed =
- [ this, service, collEntry, nss, refreshAttempt,
- onRefreshCompleted ](WithLock lk, const Status& status) noexcept {
- onRefreshCompleted(status, nullptr);
-
- // It is possible that the metadata is being changed concurrently, so retry the
- // refresh again
- if (status == ErrorCodes::ConflictingOperationInProgress &&
- refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
- _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1);
- } else {
- // Leave needsRefresh to true so that any subsequent get attempts will kick off
- // another round of refresh
- collEntry->refreshCompletionNotification->set(status);
- collEntry->refreshCompletionNotification = nullptr;
- }
- };
-
- const auto refreshCallback =
- [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
-
- ThreadClient tc("CatalogCache::collectionRefresh", service);
- auto opCtx = tc->makeOperationContext();
-
- std::shared_ptr<RoutingTableHistory> newRoutingInfo;
- try {
- newRoutingInfo = refreshCollectionRoutingInfo(
- opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
-
- onRefreshCompleted(Status::OK(), newRoutingInfo.get());
- } catch (const DBException& ex) {
- stdx::lock_guard<Latch> lg(_mutex);
- onRefreshFailed(lg, ex.toStatus());
- return;
- }
-
- stdx::lock_guard<Latch> lg(_mutex);
-
- collEntry->epochHasChanged = false;
- collEntry->needsRefresh = false;
- collEntry->refreshCompletionNotification->set(Status::OK());
- collEntry->refreshCompletionNotification = nullptr;
-
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false);
-
- // TODO(SERVER-49876): remove clang-tidy NOLINT comments.
- if (existingRoutingInfo && newRoutingInfo && // NOLINT(bugprone-use-after-move)
- existingRoutingInfo->getVersion() == // NOLINT(bugprone-use-after-move)
- newRoutingInfo->getVersion()) { // NOLINT(bugprone-use-after-move)
- // If the routingInfo hasn't changed, we need to manually reset stale shards.
- newRoutingInfo->setAllShardsRefreshed();
- }
-
- collEntry->routingInfo = std::move(newRoutingInfo);
- };
-
- const ChunkVersion startingCollectionVersion =
- (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED());
-
- LOGV2_FOR_CATALOG_REFRESH(
- 24106,
- 1,
- "Refreshing cached collection {namespace} with version {currentCollectionVersion}",
- "namespace"_attr = nss,
- "currentCollectionVersion"_attr = startingCollectionVersion);
-
- _cacheLoader.getChunksSince(nss, startingCollectionVersion)
- .thenRunOn(_executor)
- .getAsync(refreshCallback);
-
- // The routing info for this collection shouldn't change, as other threads may try to use the
- // CatalogCache while we are waiting for the refresh to complete.
- invariant(collEntry->routingInfo.get() == existingRoutingInfo.get());
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->epochHasChanged = true;
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss,
- const ShardId& staleShardId) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- if (collRoutingInfoEntry->routingInfo) {
- collRoutingInfoEntry->routingInfo->setShardStale(staleShardId);
- }
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
-}
-
-std::shared_ptr<CatalogCache::CollectionRoutingInfoEntry> CatalogCache::_createOrGetCollectionEntry(
- WithLock wl, const NamespaceString& nss) {
- auto& collectionsForDb = _collectionsByDb[nss.db()];
- if (!collectionsForDb.contains(nss.ns())) {
- // TODO SERVER-46199: ensure collections cache size is capped
- // currently no routine except for dropDatabase is removing cached collection entries and
- // the cache for a specific DB can grow indefinitely.
- collectionsForDb[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>();
- }
-
- return collectionsForDb[nss.ns()];
+void CatalogCache::invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
}
void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
@@ -708,14 +393,6 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
builder->append("totalRefreshWaitTimeMicros", totalRefreshWaitTimeMicros.load());
- builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
- builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-
- builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
- builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-
- builder->append("countFailedRefreshes", countFailedRefreshes.load());
-
if (isMongos()) {
BSONObjBuilder operationsBlockedByRefreshBuilder(
builder->subobjStart("operationsBlockedByRefresh"));
@@ -756,7 +433,6 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
OperationContext* opCtx,
const std::string& dbName,
const ComparableDatabaseVersion& previousDbVersion) {
-
// TODO (SERVER-34164): Track and increment stats for database refreshes
LOGV2_FOR_CATALOG_REFRESH(24102, 2, "Refreshing cached database entry", "db"_attr = dbName);
@@ -788,73 +464,199 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
}
}
-AtomicWord<uint64_t> ComparableDatabaseVersion::_localSequenceNumSource{1ULL};
+CatalogCache::CollectionCache::CollectionCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ CatalogCacheLoader& catalogCacheLoader)
+ : ReadThroughCache(_mutex,
+ service,
+ threadPool,
+ [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ValueHandle& collectionHistory,
+ const ComparableChunkVersion& previousChunkVersion) {
+ return _lookupCollection(
+ opCtx, nss, collectionHistory, previousChunkVersion);
+ },
+ kCollectionCacheSize),
+ _catalogCacheLoader(catalogCacheLoader) {}
-ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
- const DatabaseVersion& version) {
- return ComparableDatabaseVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+void CatalogCache::CollectionCache::reportStats(BSONObjBuilder* builder) const {
+ _stats.report(builder);
}
-const DatabaseVersion& ComparableDatabaseVersion::getVersion() const {
- return _dbVersion;
+void CatalogCache::CollectionCache::_updateRefreshesStats(const bool isIncremental,
+ const bool add) {
+ if (add) {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.addAndFetch(1);
+ _stats.countIncrementalRefreshesStarted.addAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.addAndFetch(1);
+ _stats.countFullRefreshesStarted.addAndFetch(1);
+ }
+ } else {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.subtractAndFetch(1);
+ }
+ }
}
-uint64_t ComparableDatabaseVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
-}
+void CatalogCache::CollectionCache::Stats::report(BSONObjBuilder* builder) const {
+ builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
+ builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-BSONObj ComparableDatabaseVersion::toBSON() const {
- BSONObjBuilder builder;
- _dbVersion.getUuid().appendToBuilder(&builder, "uuid");
- builder.append("lastMod", _dbVersion.getLastMod());
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
-}
+ builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
+ builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-std::string ComparableDatabaseVersion::toString() const {
- return toBSON().toString();
+ builder->append("countFailedRefreshes", countFailedRefreshes.load());
}
+CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_lookupCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const RoutingTableHistoryValueHandle& existingHistory,
+ const ComparableChunkVersion& previousVersion) {
+ const bool isIncremental(existingHistory && existingHistory->optRt);
+ _updateRefreshesStats(isIncremental, true);
-CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
- : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+ Timer t{};
+ try {
+ auto lookupVersion =
+ isIncremental ? existingHistory->optRt->getVersion() : ChunkVersion::UNSHARDED();
-const ShardId& CachedDatabaseInfo::primaryId() const {
- return _dbt.getPrimary();
+ LOGV2_FOR_CATALOG_REFRESH(4619900,
+ 1,
+ "Refreshing cached collection",
+ "namespace"_attr = nss,
+ "currentVersion"_attr = previousVersion);
+
+ auto collectionAndChunks = _catalogCacheLoader.getChunksSince(nss, lookupVersion).get();
+
+ auto newRoutingHistory = [&] {
+ // If we have routing info already and it's for the same collection epoch, we're
+ // updating. Otherwise, we're making a whole new routing table.
+ if (isIncremental &&
+ existingHistory->optRt->getVersion().epoch() == collectionAndChunks.epoch) {
+ return existingHistory->optRt->makeUpdated(collectionAndChunks.reshardingFields,
+ collectionAndChunks.changedChunks);
+ }
+
+ auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
+ if (!collectionAndChunks.defaultCollation.isEmpty()) {
+ // The collation should have been validated upon collection creation
+ return uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collectionAndChunks.defaultCollation));
+ }
+ return nullptr;
+ }();
+
+ return RoutingTableHistory::makeNew(nss,
+ collectionAndChunks.uuid,
+ KeyPattern(collectionAndChunks.shardKeyPattern),
+ std::move(defaultCollator),
+ collectionAndChunks.shardKeyIsUnique,
+ collectionAndChunks.epoch,
+ std::move(collectionAndChunks.reshardingFields),
+ collectionAndChunks.changedChunks);
+ }();
+
+ newRoutingHistory.setAllShardsRefreshed();
+
+ // Check that the shards all match with what is on the config server
+ std::set<ShardId> shardIds;
+ newRoutingHistory.getAllShardIds(&shardIds);
+ for (const auto& shardId : shardIds) {
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
+ }
+
+ const auto newVersion =
+ ComparableChunkVersion::makeComparableChunkVersion(newRoutingHistory.getVersion());
+
+ LOGV2_FOR_CATALOG_REFRESH(4619901,
+ isIncremental || newVersion != previousVersion ? 0 : 1,
+ "Refreshed cached collection",
+ "namespace"_attr = nss,
+ "newVersion"_attr = newVersion,
+ "oldVersion"_attr = previousVersion,
+ "duration"_attr = Milliseconds(t.millis()));
+ _updateRefreshesStats(isIncremental, false);
+
+ return LookupResult(OptionalRoutingTableHistory(std::move(newRoutingHistory)), newVersion);
+ } catch (const DBException& ex) {
+ _stats.countFailedRefreshes.addAndFetch(1);
+ _updateRefreshesStats(isIncremental, false);
+
+ if (ex.code() == ErrorCodes::NamespaceNotFound) {
+ LOGV2_FOR_CATALOG_REFRESH(4619902,
+ 0,
+ "Collection has found to be unsharded after refresh",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()));
+
+ return LookupResult(
+ OptionalRoutingTableHistory(),
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()));
+ }
+
+ LOGV2_FOR_CATALOG_REFRESH(4619903,
+ 0,
+ "Error refreshing cached collection",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()),
+ "error"_attr = redact(ex));
+
+ throw;
+ }
}
-bool CachedDatabaseInfo::shardingEnabled() const {
- return _dbt.getSharded();
+AtomicWord<uint64_t> ComparableDatabaseVersion::_uuidDisambiguatingSequenceNumSource{1ULL};
+
+ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
+ const DatabaseVersion& version) {
+ return ComparableDatabaseVersion(version, _uuidDisambiguatingSequenceNumSource.fetchAndAdd(1));
}
-DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
- return _dbt.getVersion();
+std::string ComparableDatabaseVersion::toString() const {
+ return str::stream() << (_dbVersion ? _dbVersion->toBSON().toString() : "NONE") << "|"
+ << _uuidDisambiguatingSequenceNum;
}
-AtomicWord<uint64_t> ComparableChunkVersion::_localSequenceNumSource{1ULL};
+bool ComparableDatabaseVersion::operator==(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return true; // Default constructed value
+ if (_dbVersion.is_initialized() != other._dbVersion.is_initialized())
+ return false; // One side is default constructed value
-ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion(
- const ChunkVersion& version) {
- return ComparableChunkVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+ return sameUuid(other) && (_dbVersion->getLastMod() == other._dbVersion->getLastMod());
}
-const ChunkVersion& ComparableChunkVersion::getVersion() const {
- return _chunkVersion;
+bool ComparableDatabaseVersion::operator<(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return false; // Default constructed value
+
+ if (_dbVersion && other._dbVersion && sameUuid(other)) {
+ return _dbVersion->getLastMod() < other._dbVersion->getLastMod();
+ } else {
+ return _uuidDisambiguatingSequenceNum < other._uuidDisambiguatingSequenceNum;
+ }
}
-uint64_t ComparableChunkVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
+CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
+ : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+
+const ShardId& CachedDatabaseInfo::primaryId() const {
+ return _dbt.getPrimary();
}
-BSONObj ComparableChunkVersion::toBSON() const {
- BSONObjBuilder builder;
- _chunkVersion.appendToCommand(&builder);
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
+bool CachedDatabaseInfo::shardingEnabled() const {
+ return _dbt.getSharded();
}
-std::string ComparableChunkVersion::toString() const {
- return toBSON().toString();
+DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
+ return _dbt.getVersion();
}
} // namespace mongo