diff options
Diffstat (limited to 'src/mongo/s/catalog_cache.cpp')
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 762 |
1 files changed, 282 insertions, 480 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 19846e62b48..d9c2500f2d3 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -55,6 +55,7 @@ #include "mongo/util/timer.h" namespace mongo { + const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh = OperationContext::declareDecoration<bool>(); @@ -68,81 +69,8 @@ namespace { const int kMaxInconsistentRoutingInfoRefreshAttempts = 3; const int kDatabaseCacheSize = 10000; -/** - * Returns whether two shard versions have a matching epoch. - */ -bool shardVersionsHaveMatchingEpoch(boost::optional<ChunkVersion> wanted, - const ChunkVersion& received) { - return wanted && wanted->epoch() == received.epoch(); -}; - -/** - * Given an (optional) initial routing table and a set of changed chunks returned by the catalog - * cache loader, produces a new routing table with the changes applied. - * - * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that - * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace - * so that the routing table can be built from scratch. - * - * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not - * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same - * as that of the collection). Since this situation may be transient, due to the collection being - * dropped or having its shard key refined concurrently, the caller must retry the reload up to some - * configurable number of attempts. - */ -std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo( - OperationContext* opCtx, - const NamespaceString& nss, - std::shared_ptr<RoutingTableHistory> existingRoutingInfo, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) { - if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { - return nullptr; - } - const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks)); - - auto chunkManager = [&] { - // If we have routing info already and it's for the same collection epoch, we're updating. - // Otherwise, we're making a whole new routing table. - if (existingRoutingInfo && - existingRoutingInfo->getVersion().epoch() == collectionAndChunks.epoch) { - if (collectionAndChunks.changedChunks.size() == 1 && - collectionAndChunks.changedChunks[0].getVersion() == - existingRoutingInfo->getVersion()) - return existingRoutingInfo; - - return std::make_shared<RoutingTableHistory>( - existingRoutingInfo->makeUpdated(std::move(collectionAndChunks.reshardingFields), - collectionAndChunks.changedChunks)); - } - - auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> { - if (!collectionAndChunks.defaultCollation.isEmpty()) { - // The collation should have been validated upon collection creation - return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(collectionAndChunks.defaultCollation)); - } - return nullptr; - }(); - - return std::make_shared<RoutingTableHistory>( - RoutingTableHistory::makeNew(nss, - collectionAndChunks.uuid, - KeyPattern(collectionAndChunks.shardKeyPattern), - std::move(defaultCollator), - collectionAndChunks.shardKeyIsUnique, - collectionAndChunks.epoch, - std::move(collectionAndChunks.reshardingFields), - collectionAndChunks.changedChunks)); - }(); - - std::set<ShardId> shardIds; - chunkManager->getAllShardIds(&shardIds); - for (const auto& shardId : shardIds) { - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - } - return chunkManager; -} +const int kCollectionCacheSize = 10000; } // namespace @@ -155,7 +83,8 @@ CatalogCache::CatalogCache(ServiceContext* const service, CatalogCacheLoader& ca options.maxThreads = 6; return options; }())), - _databaseCache(service, *_executor, _cacheLoader) { + _databaseCache(service, *_executor, _cacheLoader), + _collectionCache(service, *_executor, _cacheLoader) { _executor->startup(); } @@ -190,111 +119,89 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx } } -StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss) { - return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo; -} - -CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoWithForcedRefresh( - OperationContext* opCtx, const NamespaceString& nss) { - setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - _createOrGetCollectionEntryAndMarkAsNeedsRefresh(nss); - return _getCollectionRoutingInfo(opCtx, nss); -} - -CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss) { - return _getCollectionRoutingInfoAt(opCtx, nss, boost::none); -} - - -StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx, - const NamespaceString& nss, - Timestamp atClusterTime) { - return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo; -} - -CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt( +StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt( OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) { - invariant(!opCtx->lockState() || !opCtx->lockState()->isLocked(), - "Do not hold a lock while refreshing the catalog cache. Doing so would potentially " - "hold the lock during a network call, and can lead to a deadlock as described in " - "SERVER-37398."); - // This default value can cause a single unnecessary extra refresh if this thread did do the - // refresh but the refresh failed, or if the database or collection was not found, but only if - // the caller is getCollectionRoutingInfoWithRefresh with the parameter - // forceRefreshFromThisThread set to true - RefreshAction refreshActionTaken(RefreshAction::kDidNotPerformRefresh); - while (true) { + invariant( + !opCtx->lockState() || !opCtx->lockState()->isLocked(), + "Do not hold a lock while refreshing the catalog cache. Doing so would potentially hold " + "the lock during a network call, and can lead to a deadlock as described in SERVER-37398."); + + try { const auto swDbInfo = getDatabase(opCtx, nss.db()); + if (!swDbInfo.isOK()) { if (swDbInfo == ErrorCodes::NamespaceNotFound) { LOGV2_FOR_CATALOG_REFRESH( - 4947102, + 4947103, 2, "Invalidating cached collection entry because its database has been dropped", "namespace"_attr = nss); - purgeCollection(nss); + invalidateCollectionEntry_LINEARIZABLE(nss); } - return {swDbInfo.getStatus(), refreshActionTaken}; + return swDbInfo.getStatus(); } const auto dbInfo = std::move(swDbInfo.getValue()); - stdx::unique_lock<Latch> ul(_mutex); - - auto collEntry = _createOrGetCollectionEntry(ul, nss); + const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh && + !operationShouldBlockBehindCatalogCacheRefresh(opCtx) + ? CacheCausalConsistency::kLatestCached + : CacheCausalConsistency::kLatestKnown; - if (collEntry->needsRefresh && - (!gEnableFinerGrainedCatalogCacheRefresh || collEntry->epochHasChanged || - operationShouldBlockBehindCatalogCacheRefresh(opCtx))) { + auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency); - operationBlockedBehindCatalogCacheRefresh(opCtx) = true; + // If the entry is in the cache return inmediately. + if (collEntryFuture.isReady()) { + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false); + return ChunkManager(dbInfo.primaryId(), + dbInfo.databaseVersion(), + collEntryFuture.get(opCtx), + atClusterTime); + } - auto refreshNotification = collEntry->refreshCompletionNotification; - if (!refreshNotification) { - refreshNotification = (collEntry->refreshCompletionNotification = - std::make_shared<Notification<Status>>()); - _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1); - refreshActionTaken = RefreshAction::kPerformedRefresh; - } + operationBlockedBehindCatalogCacheRefresh(opCtx) = true; - // Wait on the notification outside of the mutex - ul.unlock(); + size_t acquireTries = 0; + Timer t; - auto refreshStatus = [&]() { - Timer t; - ON_BLOCK_EXIT([&] { _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); }); + while (true) { + try { + auto collEntry = collEntryFuture.get(opCtx); + _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); - try { - const Milliseconds kReportingInterval{250}; - while (!refreshNotification->waitFor(opCtx, kReportingInterval)) { - _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); - t.reset(); - } + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false); - return refreshNotification->get(opCtx); - } catch (const DBException& ex) { + return ChunkManager(dbInfo.primaryId(), + dbInfo.databaseVersion(), + std::move(collEntry), + atClusterTime); + } catch (ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) { + _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); + acquireTries++; + if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) { return ex.toStatus(); } - }(); - - if (!refreshStatus.isOK()) { - return {refreshStatus, refreshActionTaken}; } - // Once the refresh is complete, loop around to get the latest value - continue; + collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency); + t.reset(); } - - return {ChunkManager(dbInfo.primaryId(), - dbInfo.databaseVersion(), - collEntry->routingInfo, - atClusterTime), - refreshActionTaken}; + } catch (const DBException& ex) { + return ex.toStatus(); } } +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) { + return _getCollectionRoutingInfoAt(opCtx, nss, boost::none); +} + +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + Timestamp atClusterTime) { + return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime); +} + StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx, StringData dbName) { // TODO SERVER-49724: Make ReadThroughCache support StringData keys @@ -303,32 +210,20 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon } StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) { - auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss); - // We want to ensure that we don't join an in-progress refresh because that - // could violate causal consistency for this client. We don't need to actually perform the - // refresh ourselves but we do need the refresh to begin *after* this function is - // called, so calling it twice is enough regardless of what happens the - // second time. See SERVER-33954 for reasoning. - if (forceRefreshFromThisThread && - refreshResult.actionTaken == RefreshAction::kDidNotPerformRefresh) { - refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss); - } - return refreshResult.statusWithInfo; + OperationContext* opCtx, const NamespaceString& nss) { + _collectionCache.invalidate(nss); + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); + return getCollectionRoutingInfo(opCtx, nss); } StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss) { - auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo; - if (!swRoutingInfo.isOK()) - return swRoutingInfo; - - auto cri(std::move(swRoutingInfo.getValue())); - if (!cri.isSharded()) + auto routingInfoStatus = getCollectionRoutingInfoWithRefresh(opCtx, nss); + if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().isSharded()) { return {ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << nss.ns() << " is not sharded."}; - - return cri; + } + return routingInfoStatus; } void CatalogCache::onStaleDatabaseVersion(const StringData dbName, @@ -350,48 +245,49 @@ void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationCon if (gEnableFinerGrainedCatalogCacheRefresh) { operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock; } -}; +} void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection( - OperationContext* opCtx, const NamespaceString& nss, - boost::optional<ChunkVersion> wantedVersion, - const ChunkVersion& receivedVersion, - ShardId shardId) { - if (shardVersionsHaveMatchingEpoch(wantedVersion, receivedVersion)) { - _createOrGetCollectionEntryAndMarkShardStale(nss, shardId); - } else { - _createOrGetCollectionEntryAndMarkEpochStale(nss); + const boost::optional<ChunkVersion>& wantedVersion, + const ShardId& shardId) { + _stats.countStaleConfigErrors.addAndFetch(1); + + auto collectionEntry = _collectionCache.peekLatestCached(nss); + if (collectionEntry && collectionEntry->optRt) { + collectionEntry->optRt->setShardStale(shardId); } -}; -void CatalogCache::onEpochChange(const NamespaceString& nss) { - _createOrGetCollectionEntryAndMarkEpochStale(nss); -}; + if (wantedVersion) { + _collectionCache.advanceTimeInStore( + nss, ComparableChunkVersion::makeComparableChunkVersion(*wantedVersion)); + } else { + _collectionCache.advanceTimeInStore( + nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh()); + } +} void CatalogCache::checkEpochOrThrow(const NamespaceString& nss, - ChunkVersion targetCollectionVersion, - const ShardId& shardId) const { - stdx::lock_guard<Latch> lg(_mutex); - const auto itDb = _collectionsByDb.find(nss.db()); + const ChunkVersion& targetCollectionVersion, + const ShardId& shardId) { uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId), str::stream() << "could not act as router for " << nss.ns() << ", no entry for database " << nss.db(), - itDb != _collectionsByDb.end()); + _databaseCache.peekLatestCached(nss.db().toString())); - auto itColl = itDb->second.find(nss.ns()); + auto collectionValueHandle = _collectionCache.peekLatestCached(nss); uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId), str::stream() << "could not act as router for " << nss.ns() << ", no entry for collection.", - itColl != itDb->second.end()); + collectionValueHandle); uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId), str::stream() << "could not act as router for " << nss.ns() << ", wanted " << targetCollectionVersion.toString() << ", but found the collection was unsharded", - itColl->second->routingInfo); + collectionValueHandle->optRt); - auto foundVersion = itColl->second->routingInfo->getVersion(); + auto foundVersion = collectionValueHandle->optRt->getVersion(); uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion, shardId), str::stream() << "could not act as router for " << nss.ns() << ", wanted " << targetCollectionVersion.toString() << ", but found " @@ -399,11 +295,6 @@ void CatalogCache::checkEpochOrThrow(const NamespaceString& nss, foundVersion.epoch() == targetCollectionVersion.epoch()); } -void CatalogCache::invalidateShardForShardedCollection(const NamespaceString& nss, - const ShardId& staleShardId) { - _createOrGetCollectionEntryAndMarkShardStale(nss, staleShardId); -} - void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) { LOGV2_DEBUG(4997600, 1, @@ -413,32 +304,24 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) { _databaseCache.invalidateCachedValueIf( [&](const DatabaseType& dbt) { return dbt.getPrimary() == shardId; }); - stdx::lock_guard<Latch> lg(_mutex); - // Invalidate collections which contain data on this shard. - for (const auto& [db, collInfoMap] : _collectionsByDb) { - for (const auto& [collNs, collRoutingInfoEntry] : collInfoMap) { - if (!collRoutingInfoEntry->needsRefresh && collRoutingInfoEntry->routingInfo) { - // The set of shards on which this collection contains chunks. - std::set<ShardId> shardsOwningDataForCollection; - collRoutingInfoEntry->routingInfo->getAllShardIds(&shardsOwningDataForCollection); - - if (shardsOwningDataForCollection.find(shardId) != - shardsOwningDataForCollection.end()) { - LOGV2_DEBUG(22647, - 3, - "Invalidating cached collection {namespace} that has data " - "on shard {shardId}", - "Invalidating cached collection", - "namespace"_attr = collNs, - "shardId"_attr = shardId); - - collRoutingInfoEntry->needsRefresh = true; - collRoutingInfoEntry->routingInfo->setShardStale(shardId); - } - } - } - } + _collectionCache.invalidateCachedValueIf([&](const OptionalRoutingTableHistory& ort) { + if (!ort.optRt) + return false; + const auto& rt = *ort.optRt; + + std::set<ShardId> shardIds; + rt.getAllShardIds(&shardIds); + + LOGV2_DEBUG(22647, + 3, + "Invalidating cached collection {namespace} that has data " + "on shard {shardId}", + "Invalidating cached collection", + "namespace"_attr = rt.nss(), + "shardId"_attr = shardId); + return shardIds.find(shardId) != shardIds.end(); + }); LOGV2(22648, "Finished invalidating databases and collections with data on shard: {shardId}", @@ -446,46 +329,28 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) { "shardId"_attr = shardId); } -void CatalogCache::purgeCollection(const NamespaceString& nss) { - stdx::lock_guard<Latch> lg(_mutex); - - auto itDb = _collectionsByDb.find(nss.db()); - if (itDb == _collectionsByDb.end()) { - return; - } - - itDb->second.erase(nss.ns()); -} - void CatalogCache::purgeDatabase(StringData dbName) { _databaseCache.invalidate(dbName.toString()); - stdx::lock_guard<Latch> lg(_mutex); - _collectionsByDb.erase(dbName); + _collectionCache.invalidateKeyIf( + [&](const NamespaceString& nss) { return nss.db() == dbName; }); } void CatalogCache::purgeAllDatabases() { _databaseCache.invalidateAll(); - stdx::lock_guard<Latch> lg(_mutex); - _collectionsByDb.clear(); + _collectionCache.invalidateAll(); } void CatalogCache::report(BSONObjBuilder* builder) const { BSONObjBuilder cacheStatsBuilder(builder->subobjStart("catalogCache")); - size_t numDatabaseEntries; - size_t numCollectionEntries{0}; - { - numDatabaseEntries = _databaseCache.getCacheInfo().size(); - stdx::lock_guard<Latch> ul(_mutex); - for (const auto& entry : _collectionsByDb) { - numCollectionEntries += entry.second.size(); - } - } + const size_t numDatabaseEntries = _databaseCache.getCacheInfo().size(); + const size_t numCollectionEntries = _collectionCache.getCacheInfo().size(); cacheStatsBuilder.append("numDatabaseEntries", static_cast<long long>(numDatabaseEntries)); cacheStatsBuilder.append("numCollectionEntries", static_cast<long long>(numCollectionEntries)); _stats.report(&cacheStatsBuilder); + _collectionCache.reportStats(&cacheStatsBuilder); } void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, @@ -519,188 +384,8 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC } } -void CatalogCache::_scheduleCollectionRefresh(WithLock lk, - ServiceContext* service, - std::shared_ptr<CollectionRoutingInfoEntry> collEntry, - NamespaceString const& nss, - int refreshAttempt) { - const auto existingRoutingInfo = collEntry->routingInfo; - - // If we have an existing chunk manager, the refresh is considered "incremental", regardless of - // how many chunks are in the differential - const bool isIncremental(existingRoutingInfo); - - if (isIncremental) { - _stats.numActiveIncrementalRefreshes.addAndFetch(1); - _stats.countIncrementalRefreshesStarted.addAndFetch(1); - } else { - _stats.numActiveFullRefreshes.addAndFetch(1); - _stats.countFullRefreshesStarted.addAndFetch(1); - } - - // Invoked when one iteration of getChunksSince has completed, whether with success or error - const auto onRefreshCompleted = [this, t = Timer(), nss, isIncremental, existingRoutingInfo]( - const Status& status, - RoutingTableHistory* routingInfoAfterRefresh) { - if (isIncremental) { - _stats.numActiveIncrementalRefreshes.subtractAndFetch(1); - } else { - _stats.numActiveFullRefreshes.subtractAndFetch(1); - } - - if (!status.isOK()) { - _stats.countFailedRefreshes.addAndFetch(1); - - LOGV2_OPTIONS(24103, - {logv2::LogComponent::kShardingCatalogRefresh}, - "Error refreshing cached collection {namespace}; Took {duration} and " - "failed due to {error}", - "Error refreshing cached collection", - "namespace"_attr = nss, - "duration"_attr = Milliseconds(t.millis()), - "error"_attr = redact(status)); - } else if (routingInfoAfterRefresh) { - const int logLevel = - (!existingRoutingInfo || - (existingRoutingInfo && - routingInfoAfterRefresh->getVersion() != existingRoutingInfo->getVersion())) - ? 0 - : 1; - LOGV2_FOR_CATALOG_REFRESH( - 24104, - logLevel, - "Refreshed cached collection {namespace} to version {newVersion} from version " - "{oldVersion}. Took {duration}", - "Refreshed cached collection", - "namespace"_attr = nss, - "newVersion"_attr = routingInfoAfterRefresh->getVersion(), - "oldVersion"_attr = - (existingRoutingInfo - ? (" from version " + existingRoutingInfo->getVersion().toString()) - : ""), - "duration"_attr = Milliseconds(t.millis())); - } else { - LOGV2_OPTIONS(24105, - {logv2::LogComponent::kShardingCatalogRefresh}, - "Collection {namespace} was found to be unsharded after refresh that " - "took {duration}", - "Collection has found to be unsharded after refresh", - "namespace"_attr = nss, - "duration"_attr = Milliseconds(t.millis())); - } - }; - - // Invoked if getChunksSince resulted in error or threw an exception - const auto onRefreshFailed = - [ this, service, collEntry, nss, refreshAttempt, - onRefreshCompleted ](WithLock lk, const Status& status) noexcept { - onRefreshCompleted(status, nullptr); - - // It is possible that the metadata is being changed concurrently, so retry the - // refresh again - if (status == ErrorCodes::ConflictingOperationInProgress && - refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) { - _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1); - } else { - // Leave needsRefresh to true so that any subsequent get attempts will kick off - // another round of refresh - collEntry->refreshCompletionNotification->set(status); - collEntry->refreshCompletionNotification = nullptr; - } - }; - - const auto refreshCallback = - [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ]( - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - - ThreadClient tc("CatalogCache::collectionRefresh", service); - auto opCtx = tc->makeOperationContext(); - - std::shared_ptr<RoutingTableHistory> newRoutingInfo; - try { - newRoutingInfo = refreshCollectionRoutingInfo( - opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); - - onRefreshCompleted(Status::OK(), newRoutingInfo.get()); - } catch (const DBException& ex) { - stdx::lock_guard<Latch> lg(_mutex); - onRefreshFailed(lg, ex.toStatus()); - return; - } - - stdx::lock_guard<Latch> lg(_mutex); - - collEntry->epochHasChanged = false; - collEntry->needsRefresh = false; - collEntry->refreshCompletionNotification->set(Status::OK()); - collEntry->refreshCompletionNotification = nullptr; - - setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false); - - // TODO(SERVER-49876): remove clang-tidy NOLINT comments. - if (existingRoutingInfo && newRoutingInfo && // NOLINT(bugprone-use-after-move) - existingRoutingInfo->getVersion() == // NOLINT(bugprone-use-after-move) - newRoutingInfo->getVersion()) { // NOLINT(bugprone-use-after-move) - // If the routingInfo hasn't changed, we need to manually reset stale shards. - newRoutingInfo->setAllShardsRefreshed(); - } - - collEntry->routingInfo = std::move(newRoutingInfo); - }; - - const ChunkVersion startingCollectionVersion = - (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED()); - - LOGV2_FOR_CATALOG_REFRESH( - 24106, - 1, - "Refreshing cached collection {namespace} with version {currentCollectionVersion}", - "namespace"_attr = nss, - "currentCollectionVersion"_attr = startingCollectionVersion); - - _cacheLoader.getChunksSince(nss, startingCollectionVersion) - .thenRunOn(_executor) - .getAsync(refreshCallback); - - // The routing info for this collection shouldn't change, as other threads may try to use the - // CatalogCache while we are waiting for the refresh to complete. - invariant(collEntry->routingInfo.get() == existingRoutingInfo.get()); -} - -void CatalogCache::_createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss) { - stdx::lock_guard<Latch> lg(_mutex); - auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss); - collRoutingInfoEntry->needsRefresh = true; - collRoutingInfoEntry->epochHasChanged = true; -} - -void CatalogCache::_createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss, - const ShardId& staleShardId) { - stdx::lock_guard<Latch> lg(_mutex); - auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss); - collRoutingInfoEntry->needsRefresh = true; - if (collRoutingInfoEntry->routingInfo) { - collRoutingInfoEntry->routingInfo->setShardStale(staleShardId); - } -} - -void CatalogCache::_createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss) { - stdx::lock_guard<Latch> lg(_mutex); - auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss); - collRoutingInfoEntry->needsRefresh = true; -} - -std::shared_ptr<CatalogCache::CollectionRoutingInfoEntry> CatalogCache::_createOrGetCollectionEntry( - WithLock wl, const NamespaceString& nss) { - auto& collectionsForDb = _collectionsByDb[nss.db()]; - if (!collectionsForDb.contains(nss.ns())) { - // TODO SERVER-46199: ensure collections cache size is capped - // currently no routine except for dropDatabase is removing cached collection entries and - // the cache for a specific DB can grow indefinitely. - collectionsForDb[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>(); - } - - return collectionsForDb[nss.ns()]; +void CatalogCache::invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss) { + _collectionCache.invalidate(nss); } void CatalogCache::Stats::report(BSONObjBuilder* builder) const { @@ -708,14 +393,6 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const { builder->append("totalRefreshWaitTimeMicros", totalRefreshWaitTimeMicros.load()); - builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load()); - builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load()); - - builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load()); - builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load()); - - builder->append("countFailedRefreshes", countFailedRefreshes.load()); - if (isMongos()) { BSONObjBuilder operationsBlockedByRefreshBuilder( builder->subobjStart("operationsBlockedByRefresh")); @@ -756,7 +433,6 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa OperationContext* opCtx, const std::string& dbName, const ComparableDatabaseVersion& previousDbVersion) { - // TODO (SERVER-34164): Track and increment stats for database refreshes LOGV2_FOR_CATALOG_REFRESH(24102, 2, "Refreshing cached database entry", "db"_attr = dbName); @@ -788,73 +464,199 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa } } -AtomicWord<uint64_t> ComparableDatabaseVersion::_localSequenceNumSource{1ULL}; +CatalogCache::CollectionCache::CollectionCache(ServiceContext* service, + ThreadPoolInterface& threadPool, + CatalogCacheLoader& catalogCacheLoader) + : ReadThroughCache(_mutex, + service, + threadPool, + [this](OperationContext* opCtx, + const NamespaceString& nss, + const ValueHandle& collectionHistory, + const ComparableChunkVersion& previousChunkVersion) { + return _lookupCollection( + opCtx, nss, collectionHistory, previousChunkVersion); + }, + kCollectionCacheSize), + _catalogCacheLoader(catalogCacheLoader) {} -ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion( - const DatabaseVersion& version) { - return ComparableDatabaseVersion(version, _localSequenceNumSource.fetchAndAdd(1)); +void CatalogCache::CollectionCache::reportStats(BSONObjBuilder* builder) const { + _stats.report(builder); } -const DatabaseVersion& ComparableDatabaseVersion::getVersion() const { - return _dbVersion; +void CatalogCache::CollectionCache::_updateRefreshesStats(const bool isIncremental, + const bool add) { + if (add) { + if (isIncremental) { + _stats.numActiveIncrementalRefreshes.addAndFetch(1); + _stats.countIncrementalRefreshesStarted.addAndFetch(1); + } else { + _stats.numActiveFullRefreshes.addAndFetch(1); + _stats.countFullRefreshesStarted.addAndFetch(1); + } + } else { + if (isIncremental) { + _stats.numActiveIncrementalRefreshes.subtractAndFetch(1); + } else { + _stats.numActiveFullRefreshes.subtractAndFetch(1); + } + } } -uint64_t ComparableDatabaseVersion::getLocalSequenceNum() const { - return _localSequenceNum; -} +void CatalogCache::CollectionCache::Stats::report(BSONObjBuilder* builder) const { + builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load()); + builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load()); -BSONObj ComparableDatabaseVersion::toBSON() const { - BSONObjBuilder builder; - _dbVersion.getUuid().appendToBuilder(&builder, "uuid"); - builder.append("lastMod", _dbVersion.getLastMod()); - builder.append("localSequenceNum", std::to_string(_localSequenceNum)); - return builder.obj(); -} + builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load()); + builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load()); -std::string ComparableDatabaseVersion::toString() const { - return toBSON().toString(); + builder->append("countFailedRefreshes", countFailedRefreshes.load()); } +CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_lookupCollection( + OperationContext* opCtx, + const NamespaceString& nss, + const RoutingTableHistoryValueHandle& existingHistory, + const ComparableChunkVersion& previousVersion) { + const bool isIncremental(existingHistory && existingHistory->optRt); + _updateRefreshesStats(isIncremental, true); -CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard) - : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {} + Timer t{}; + try { + auto lookupVersion = + isIncremental ? existingHistory->optRt->getVersion() : ChunkVersion::UNSHARDED(); -const ShardId& CachedDatabaseInfo::primaryId() const { - return _dbt.getPrimary(); + LOGV2_FOR_CATALOG_REFRESH(4619900, + 1, + "Refreshing cached collection", + "namespace"_attr = nss, + "currentVersion"_attr = previousVersion); + + auto collectionAndChunks = _catalogCacheLoader.getChunksSince(nss, lookupVersion).get(); + + auto newRoutingHistory = [&] { + // If we have routing info already and it's for the same collection epoch, we're + // updating. Otherwise, we're making a whole new routing table. + if (isIncremental && + existingHistory->optRt->getVersion().epoch() == collectionAndChunks.epoch) { + return existingHistory->optRt->makeUpdated(collectionAndChunks.reshardingFields, + collectionAndChunks.changedChunks); + } + + auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> { + if (!collectionAndChunks.defaultCollation.isEmpty()) { + // The collation should have been validated upon collection creation + return uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(collectionAndChunks.defaultCollation)); + } + return nullptr; + }(); + + return RoutingTableHistory::makeNew(nss, + collectionAndChunks.uuid, + KeyPattern(collectionAndChunks.shardKeyPattern), + std::move(defaultCollator), + collectionAndChunks.shardKeyIsUnique, + collectionAndChunks.epoch, + std::move(collectionAndChunks.reshardingFields), + collectionAndChunks.changedChunks); + }(); + + newRoutingHistory.setAllShardsRefreshed(); + + // Check that the shards all match with what is on the config server + std::set<ShardId> shardIds; + newRoutingHistory.getAllShardIds(&shardIds); + for (const auto& shardId : shardIds) { + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + } + + const auto newVersion = + ComparableChunkVersion::makeComparableChunkVersion(newRoutingHistory.getVersion()); + + LOGV2_FOR_CATALOG_REFRESH(4619901, + isIncremental || newVersion != previousVersion ? 0 : 1, + "Refreshed cached collection", + "namespace"_attr = nss, + "newVersion"_attr = newVersion, + "oldVersion"_attr = previousVersion, + "duration"_attr = Milliseconds(t.millis())); + _updateRefreshesStats(isIncremental, false); + + return LookupResult(OptionalRoutingTableHistory(std::move(newRoutingHistory)), newVersion); + } catch (const DBException& ex) { + _stats.countFailedRefreshes.addAndFetch(1); + _updateRefreshesStats(isIncremental, false); + + if (ex.code() == ErrorCodes::NamespaceNotFound) { + LOGV2_FOR_CATALOG_REFRESH(4619902, + 0, + "Collection has found to be unsharded after refresh", + "namespace"_attr = nss, + "duration"_attr = Milliseconds(t.millis())); + + return LookupResult( + OptionalRoutingTableHistory(), + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED())); + } + + LOGV2_FOR_CATALOG_REFRESH(4619903, + 0, + "Error refreshing cached collection", + "namespace"_attr = nss, + "duration"_attr = Milliseconds(t.millis()), + "error"_attr = redact(ex)); + + throw; + } } -bool CachedDatabaseInfo::shardingEnabled() const { - return _dbt.getSharded(); +AtomicWord<uint64_t> ComparableDatabaseVersion::_uuidDisambiguatingSequenceNumSource{1ULL}; + +ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion( + const DatabaseVersion& version) { + return ComparableDatabaseVersion(version, _uuidDisambiguatingSequenceNumSource.fetchAndAdd(1)); } -DatabaseVersion CachedDatabaseInfo::databaseVersion() const { - return _dbt.getVersion(); +std::string ComparableDatabaseVersion::toString() const { + return str::stream() << (_dbVersion ? _dbVersion->toBSON().toString() : "NONE") << "|" + << _uuidDisambiguatingSequenceNum; } -AtomicWord<uint64_t> ComparableChunkVersion::_localSequenceNumSource{1ULL}; +bool ComparableDatabaseVersion::operator==(const ComparableDatabaseVersion& other) const { + if (!_dbVersion && !other._dbVersion) + return true; // Default constructed value + if (_dbVersion.is_initialized() != other._dbVersion.is_initialized()) + return false; // One side is default constructed value -ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion( - const ChunkVersion& version) { - return ComparableChunkVersion(version, _localSequenceNumSource.fetchAndAdd(1)); + return sameUuid(other) && (_dbVersion->getLastMod() == other._dbVersion->getLastMod()); } -const ChunkVersion& ComparableChunkVersion::getVersion() const { - return _chunkVersion; +bool ComparableDatabaseVersion::operator<(const ComparableDatabaseVersion& other) const { + if (!_dbVersion && !other._dbVersion) + return false; // Default constructed value + + if (_dbVersion && other._dbVersion && sameUuid(other)) { + return _dbVersion->getLastMod() < other._dbVersion->getLastMod(); + } else { + return _uuidDisambiguatingSequenceNum < other._uuidDisambiguatingSequenceNum; + } } -uint64_t ComparableChunkVersion::getLocalSequenceNum() const { - return _localSequenceNum; +CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard) + : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {} + +const ShardId& CachedDatabaseInfo::primaryId() const { + return _dbt.getPrimary(); } -BSONObj ComparableChunkVersion::toBSON() const { - BSONObjBuilder builder; - _chunkVersion.appendToCommand(&builder); - builder.append("localSequenceNum", std::to_string(_localSequenceNum)); - return builder.obj(); +bool CachedDatabaseInfo::shardingEnabled() const { + return _dbt.getSharded(); } -std::string ComparableChunkVersion::toString() const { - return toBSON().toString(); +DatabaseVersion CachedDatabaseInfo::databaseVersion() const { + return _dbt.getVersion(); } } // namespace mongo |