diff options
Diffstat (limited to 'src/mongo/s')
24 files changed, 927 insertions, 856 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 19846e62b48..d9c2500f2d3 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -55,6 +55,7 @@ #include "mongo/util/timer.h" namespace mongo { + const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh = OperationContext::declareDecoration<bool>(); @@ -68,81 +69,8 @@ namespace { const int kMaxInconsistentRoutingInfoRefreshAttempts = 3; const int kDatabaseCacheSize = 10000; -/** - * Returns whether two shard versions have a matching epoch. - */ -bool shardVersionsHaveMatchingEpoch(boost::optional<ChunkVersion> wanted, - const ChunkVersion& received) { - return wanted && wanted->epoch() == received.epoch(); -}; - -/** - * Given an (optional) initial routing table and a set of changed chunks returned by the catalog - * cache loader, produces a new routing table with the changes applied. - * - * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that - * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace - * so that the routing table can be built from scratch. - * - * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not - * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same - * as that of the collection). Since this situation may be transient, due to the collection being - * dropped or having its shard key refined concurrently, the caller must retry the reload up to some - * configurable number of attempts. - */ -std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo( - OperationContext* opCtx, - const NamespaceString& nss, - std::shared_ptr<RoutingTableHistory> existingRoutingInfo, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) { - if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { - return nullptr; - } - const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks)); - - auto chunkManager = [&] { - // If we have routing info already and it's for the same collection epoch, we're updating. - // Otherwise, we're making a whole new routing table. - if (existingRoutingInfo && - existingRoutingInfo->getVersion().epoch() == collectionAndChunks.epoch) { - if (collectionAndChunks.changedChunks.size() == 1 && - collectionAndChunks.changedChunks[0].getVersion() == - existingRoutingInfo->getVersion()) - return existingRoutingInfo; - - return std::make_shared<RoutingTableHistory>( - existingRoutingInfo->makeUpdated(std::move(collectionAndChunks.reshardingFields), - collectionAndChunks.changedChunks)); - } - - auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> { - if (!collectionAndChunks.defaultCollation.isEmpty()) { - // The collation should have been validated upon collection creation - return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) - ->makeFromBSON(collectionAndChunks.defaultCollation)); - } - return nullptr; - }(); - - return std::make_shared<RoutingTableHistory>( - RoutingTableHistory::makeNew(nss, - collectionAndChunks.uuid, - KeyPattern(collectionAndChunks.shardKeyPattern), - std::move(defaultCollator), - collectionAndChunks.shardKeyIsUnique, - collectionAndChunks.epoch, - std::move(collectionAndChunks.reshardingFields), - collectionAndChunks.changedChunks)); - }(); - - std::set<ShardId> shardIds; - chunkManager->getAllShardIds(&shardIds); - for (const auto& shardId : shardIds) { - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - } - return chunkManager; -} +const int kCollectionCacheSize = 10000; } // namespace @@ -155,7 +83,8 @@ CatalogCache::CatalogCache(ServiceContext* const service, CatalogCacheLoader& ca options.maxThreads = 6; return options; }())), - _databaseCache(service, *_executor, _cacheLoader) { + _databaseCache(service, *_executor, _cacheLoader), + _collectionCache(service, *_executor, _cacheLoader) { _executor->startup(); } @@ -190,111 +119,89 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx } } -StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss) { - return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo; -} - -CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoWithForcedRefresh( - OperationContext* opCtx, const NamespaceString& nss) { - setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); - _createOrGetCollectionEntryAndMarkAsNeedsRefresh(nss); - return _getCollectionRoutingInfo(opCtx, nss); -} - -CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss) { - return _getCollectionRoutingInfoAt(opCtx, nss, boost::none); -} - - -StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx, - const NamespaceString& nss, - Timestamp atClusterTime) { - return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo; -} - -CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt( +StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt( OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) { - invariant(!opCtx->lockState() || !opCtx->lockState()->isLocked(), - "Do not hold a lock while refreshing the catalog cache. Doing so would potentially " - "hold the lock during a network call, and can lead to a deadlock as described in " - "SERVER-37398."); - // This default value can cause a single unnecessary extra refresh if this thread did do the - // refresh but the refresh failed, or if the database or collection was not found, but only if - // the caller is getCollectionRoutingInfoWithRefresh with the parameter - // forceRefreshFromThisThread set to true - RefreshAction refreshActionTaken(RefreshAction::kDidNotPerformRefresh); - while (true) { + invariant( + !opCtx->lockState() || !opCtx->lockState()->isLocked(), + "Do not hold a lock while refreshing the catalog cache. Doing so would potentially hold " + "the lock during a network call, and can lead to a deadlock as described in SERVER-37398."); + + try { const auto swDbInfo = getDatabase(opCtx, nss.db()); + if (!swDbInfo.isOK()) { if (swDbInfo == ErrorCodes::NamespaceNotFound) { LOGV2_FOR_CATALOG_REFRESH( - 4947102, + 4947103, 2, "Invalidating cached collection entry because its database has been dropped", "namespace"_attr = nss); - purgeCollection(nss); + invalidateCollectionEntry_LINEARIZABLE(nss); } - return {swDbInfo.getStatus(), refreshActionTaken}; + return swDbInfo.getStatus(); } const auto dbInfo = std::move(swDbInfo.getValue()); - stdx::unique_lock<Latch> ul(_mutex); - - auto collEntry = _createOrGetCollectionEntry(ul, nss); + const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh && + !operationShouldBlockBehindCatalogCacheRefresh(opCtx) + ? CacheCausalConsistency::kLatestCached + : CacheCausalConsistency::kLatestKnown; - if (collEntry->needsRefresh && - (!gEnableFinerGrainedCatalogCacheRefresh || collEntry->epochHasChanged || - operationShouldBlockBehindCatalogCacheRefresh(opCtx))) { + auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency); - operationBlockedBehindCatalogCacheRefresh(opCtx) = true; + // If the entry is in the cache return inmediately. + if (collEntryFuture.isReady()) { + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false); + return ChunkManager(dbInfo.primaryId(), + dbInfo.databaseVersion(), + collEntryFuture.get(opCtx), + atClusterTime); + } - auto refreshNotification = collEntry->refreshCompletionNotification; - if (!refreshNotification) { - refreshNotification = (collEntry->refreshCompletionNotification = - std::make_shared<Notification<Status>>()); - _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1); - refreshActionTaken = RefreshAction::kPerformedRefresh; - } + operationBlockedBehindCatalogCacheRefresh(opCtx) = true; - // Wait on the notification outside of the mutex - ul.unlock(); + size_t acquireTries = 0; + Timer t; - auto refreshStatus = [&]() { - Timer t; - ON_BLOCK_EXIT([&] { _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); }); + while (true) { + try { + auto collEntry = collEntryFuture.get(opCtx); + _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); - try { - const Milliseconds kReportingInterval{250}; - while (!refreshNotification->waitFor(opCtx, kReportingInterval)) { - _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); - t.reset(); - } + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false); - return refreshNotification->get(opCtx); - } catch (const DBException& ex) { + return ChunkManager(dbInfo.primaryId(), + dbInfo.databaseVersion(), + std::move(collEntry), + atClusterTime); + } catch (ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) { + _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); + acquireTries++; + if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) { return ex.toStatus(); } - }(); - - if (!refreshStatus.isOK()) { - return {refreshStatus, refreshActionTaken}; } - // Once the refresh is complete, loop around to get the latest value - continue; + collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency); + t.reset(); } - - return {ChunkManager(dbInfo.primaryId(), - dbInfo.databaseVersion(), - collEntry->routingInfo, - atClusterTime), - refreshActionTaken}; + } catch (const DBException& ex) { + return ex.toStatus(); } } +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) { + return _getCollectionRoutingInfoAt(opCtx, nss, boost::none); +} + +StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + Timestamp atClusterTime) { + return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime); +} + StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx, StringData dbName) { // TODO SERVER-49724: Make ReadThroughCache support StringData keys @@ -303,32 +210,20 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon } StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) { - auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss); - // We want to ensure that we don't join an in-progress refresh because that - // could violate causal consistency for this client. We don't need to actually perform the - // refresh ourselves but we do need the refresh to begin *after* this function is - // called, so calling it twice is enough regardless of what happens the - // second time. See SERVER-33954 for reasoning. - if (forceRefreshFromThisThread && - refreshResult.actionTaken == RefreshAction::kDidNotPerformRefresh) { - refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss); - } - return refreshResult.statusWithInfo; + OperationContext* opCtx, const NamespaceString& nss) { + _collectionCache.invalidate(nss); + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true); + return getCollectionRoutingInfo(opCtx, nss); } StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh( OperationContext* opCtx, const NamespaceString& nss) { - auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo; - if (!swRoutingInfo.isOK()) - return swRoutingInfo; - - auto cri(std::move(swRoutingInfo.getValue())); - if (!cri.isSharded()) + auto routingInfoStatus = getCollectionRoutingInfoWithRefresh(opCtx, nss); + if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().isSharded()) { return {ErrorCodes::NamespaceNotSharded, str::stream() << "Collection " << nss.ns() << " is not sharded."}; - - return cri; + } + return routingInfoStatus; } void CatalogCache::onStaleDatabaseVersion(const StringData dbName, @@ -350,48 +245,49 @@ void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationCon if (gEnableFinerGrainedCatalogCacheRefresh) { operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock; } -}; +} void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection( - OperationContext* opCtx, const NamespaceString& nss, - boost::optional<ChunkVersion> wantedVersion, - const ChunkVersion& receivedVersion, - ShardId shardId) { - if (shardVersionsHaveMatchingEpoch(wantedVersion, receivedVersion)) { - _createOrGetCollectionEntryAndMarkShardStale(nss, shardId); - } else { - _createOrGetCollectionEntryAndMarkEpochStale(nss); + const boost::optional<ChunkVersion>& wantedVersion, + const ShardId& shardId) { + _stats.countStaleConfigErrors.addAndFetch(1); + + auto collectionEntry = _collectionCache.peekLatestCached(nss); + if (collectionEntry && collectionEntry->optRt) { + collectionEntry->optRt->setShardStale(shardId); } -}; -void CatalogCache::onEpochChange(const NamespaceString& nss) { - _createOrGetCollectionEntryAndMarkEpochStale(nss); -}; + if (wantedVersion) { + _collectionCache.advanceTimeInStore( + nss, ComparableChunkVersion::makeComparableChunkVersion(*wantedVersion)); + } else { + _collectionCache.advanceTimeInStore( + nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh()); + } +} void CatalogCache::checkEpochOrThrow(const NamespaceString& nss, - ChunkVersion targetCollectionVersion, - const ShardId& shardId) const { - stdx::lock_guard<Latch> lg(_mutex); - const auto itDb = _collectionsByDb.find(nss.db()); + const ChunkVersion& targetCollectionVersion, + const ShardId& shardId) { uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId), str::stream() << "could not act as router for " << nss.ns() << ", no entry for database " << nss.db(), - itDb != _collectionsByDb.end()); + _databaseCache.peekLatestCached(nss.db().toString())); - auto itColl = itDb->second.find(nss.ns()); + auto collectionValueHandle = _collectionCache.peekLatestCached(nss); uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId), str::stream() << "could not act as router for " << nss.ns() << ", no entry for collection.", - itColl != itDb->second.end()); + collectionValueHandle); uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId), str::stream() << "could not act as router for " << nss.ns() << ", wanted " << targetCollectionVersion.toString() << ", but found the collection was unsharded", - itColl->second->routingInfo); + collectionValueHandle->optRt); - auto foundVersion = itColl->second->routingInfo->getVersion(); + auto foundVersion = collectionValueHandle->optRt->getVersion(); uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion, shardId), str::stream() << "could not act as router for " << nss.ns() << ", wanted " << targetCollectionVersion.toString() << ", but found " @@ -399,11 +295,6 @@ void CatalogCache::checkEpochOrThrow(const NamespaceString& nss, foundVersion.epoch() == targetCollectionVersion.epoch()); } -void CatalogCache::invalidateShardForShardedCollection(const NamespaceString& nss, - const ShardId& staleShardId) { - _createOrGetCollectionEntryAndMarkShardStale(nss, staleShardId); -} - void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) { LOGV2_DEBUG(4997600, 1, @@ -413,32 +304,24 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) { _databaseCache.invalidateCachedValueIf( [&](const DatabaseType& dbt) { return dbt.getPrimary() == shardId; }); - stdx::lock_guard<Latch> lg(_mutex); - // Invalidate collections which contain data on this shard. - for (const auto& [db, collInfoMap] : _collectionsByDb) { - for (const auto& [collNs, collRoutingInfoEntry] : collInfoMap) { - if (!collRoutingInfoEntry->needsRefresh && collRoutingInfoEntry->routingInfo) { - // The set of shards on which this collection contains chunks. - std::set<ShardId> shardsOwningDataForCollection; - collRoutingInfoEntry->routingInfo->getAllShardIds(&shardsOwningDataForCollection); - - if (shardsOwningDataForCollection.find(shardId) != - shardsOwningDataForCollection.end()) { - LOGV2_DEBUG(22647, - 3, - "Invalidating cached collection {namespace} that has data " - "on shard {shardId}", - "Invalidating cached collection", - "namespace"_attr = collNs, - "shardId"_attr = shardId); - - collRoutingInfoEntry->needsRefresh = true; - collRoutingInfoEntry->routingInfo->setShardStale(shardId); - } - } - } - } + _collectionCache.invalidateCachedValueIf([&](const OptionalRoutingTableHistory& ort) { + if (!ort.optRt) + return false; + const auto& rt = *ort.optRt; + + std::set<ShardId> shardIds; + rt.getAllShardIds(&shardIds); + + LOGV2_DEBUG(22647, + 3, + "Invalidating cached collection {namespace} that has data " + "on shard {shardId}", + "Invalidating cached collection", + "namespace"_attr = rt.nss(), + "shardId"_attr = shardId); + return shardIds.find(shardId) != shardIds.end(); + }); LOGV2(22648, "Finished invalidating databases and collections with data on shard: {shardId}", @@ -446,46 +329,28 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) { "shardId"_attr = shardId); } -void CatalogCache::purgeCollection(const NamespaceString& nss) { - stdx::lock_guard<Latch> lg(_mutex); - - auto itDb = _collectionsByDb.find(nss.db()); - if (itDb == _collectionsByDb.end()) { - return; - } - - itDb->second.erase(nss.ns()); -} - void CatalogCache::purgeDatabase(StringData dbName) { _databaseCache.invalidate(dbName.toString()); - stdx::lock_guard<Latch> lg(_mutex); - _collectionsByDb.erase(dbName); + _collectionCache.invalidateKeyIf( + [&](const NamespaceString& nss) { return nss.db() == dbName; }); } void CatalogCache::purgeAllDatabases() { _databaseCache.invalidateAll(); - stdx::lock_guard<Latch> lg(_mutex); - _collectionsByDb.clear(); + _collectionCache.invalidateAll(); } void CatalogCache::report(BSONObjBuilder* builder) const { BSONObjBuilder cacheStatsBuilder(builder->subobjStart("catalogCache")); - size_t numDatabaseEntries; - size_t numCollectionEntries{0}; - { - numDatabaseEntries = _databaseCache.getCacheInfo().size(); - stdx::lock_guard<Latch> ul(_mutex); - for (const auto& entry : _collectionsByDb) { - numCollectionEntries += entry.second.size(); - } - } + const size_t numDatabaseEntries = _databaseCache.getCacheInfo().size(); + const size_t numCollectionEntries = _collectionCache.getCacheInfo().size(); cacheStatsBuilder.append("numDatabaseEntries", static_cast<long long>(numDatabaseEntries)); cacheStatsBuilder.append("numCollectionEntries", static_cast<long long>(numCollectionEntries)); _stats.report(&cacheStatsBuilder); + _collectionCache.reportStats(&cacheStatsBuilder); } void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, @@ -519,188 +384,8 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC } } -void CatalogCache::_scheduleCollectionRefresh(WithLock lk, - ServiceContext* service, - std::shared_ptr<CollectionRoutingInfoEntry> collEntry, - NamespaceString const& nss, - int refreshAttempt) { - const auto existingRoutingInfo = collEntry->routingInfo; - - // If we have an existing chunk manager, the refresh is considered "incremental", regardless of - // how many chunks are in the differential - const bool isIncremental(existingRoutingInfo); - - if (isIncremental) { - _stats.numActiveIncrementalRefreshes.addAndFetch(1); - _stats.countIncrementalRefreshesStarted.addAndFetch(1); - } else { - _stats.numActiveFullRefreshes.addAndFetch(1); - _stats.countFullRefreshesStarted.addAndFetch(1); - } - - // Invoked when one iteration of getChunksSince has completed, whether with success or error - const auto onRefreshCompleted = [this, t = Timer(), nss, isIncremental, existingRoutingInfo]( - const Status& status, - RoutingTableHistory* routingInfoAfterRefresh) { - if (isIncremental) { - _stats.numActiveIncrementalRefreshes.subtractAndFetch(1); - } else { - _stats.numActiveFullRefreshes.subtractAndFetch(1); - } - - if (!status.isOK()) { - _stats.countFailedRefreshes.addAndFetch(1); - - LOGV2_OPTIONS(24103, - {logv2::LogComponent::kShardingCatalogRefresh}, - "Error refreshing cached collection {namespace}; Took {duration} and " - "failed due to {error}", - "Error refreshing cached collection", - "namespace"_attr = nss, - "duration"_attr = Milliseconds(t.millis()), - "error"_attr = redact(status)); - } else if (routingInfoAfterRefresh) { - const int logLevel = - (!existingRoutingInfo || - (existingRoutingInfo && - routingInfoAfterRefresh->getVersion() != existingRoutingInfo->getVersion())) - ? 0 - : 1; - LOGV2_FOR_CATALOG_REFRESH( - 24104, - logLevel, - "Refreshed cached collection {namespace} to version {newVersion} from version " - "{oldVersion}. Took {duration}", - "Refreshed cached collection", - "namespace"_attr = nss, - "newVersion"_attr = routingInfoAfterRefresh->getVersion(), - "oldVersion"_attr = - (existingRoutingInfo - ? (" from version " + existingRoutingInfo->getVersion().toString()) - : ""), - "duration"_attr = Milliseconds(t.millis())); - } else { - LOGV2_OPTIONS(24105, - {logv2::LogComponent::kShardingCatalogRefresh}, - "Collection {namespace} was found to be unsharded after refresh that " - "took {duration}", - "Collection has found to be unsharded after refresh", - "namespace"_attr = nss, - "duration"_attr = Milliseconds(t.millis())); - } - }; - - // Invoked if getChunksSince resulted in error or threw an exception - const auto onRefreshFailed = - [ this, service, collEntry, nss, refreshAttempt, - onRefreshCompleted ](WithLock lk, const Status& status) noexcept { - onRefreshCompleted(status, nullptr); - - // It is possible that the metadata is being changed concurrently, so retry the - // refresh again - if (status == ErrorCodes::ConflictingOperationInProgress && - refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) { - _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1); - } else { - // Leave needsRefresh to true so that any subsequent get attempts will kick off - // another round of refresh - collEntry->refreshCompletionNotification->set(status); - collEntry->refreshCompletionNotification = nullptr; - } - }; - - const auto refreshCallback = - [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ]( - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - - ThreadClient tc("CatalogCache::collectionRefresh", service); - auto opCtx = tc->makeOperationContext(); - - std::shared_ptr<RoutingTableHistory> newRoutingInfo; - try { - newRoutingInfo = refreshCollectionRoutingInfo( - opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); - - onRefreshCompleted(Status::OK(), newRoutingInfo.get()); - } catch (const DBException& ex) { - stdx::lock_guard<Latch> lg(_mutex); - onRefreshFailed(lg, ex.toStatus()); - return; - } - - stdx::lock_guard<Latch> lg(_mutex); - - collEntry->epochHasChanged = false; - collEntry->needsRefresh = false; - collEntry->refreshCompletionNotification->set(Status::OK()); - collEntry->refreshCompletionNotification = nullptr; - - setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false); - - // TODO(SERVER-49876): remove clang-tidy NOLINT comments. - if (existingRoutingInfo && newRoutingInfo && // NOLINT(bugprone-use-after-move) - existingRoutingInfo->getVersion() == // NOLINT(bugprone-use-after-move) - newRoutingInfo->getVersion()) { // NOLINT(bugprone-use-after-move) - // If the routingInfo hasn't changed, we need to manually reset stale shards. - newRoutingInfo->setAllShardsRefreshed(); - } - - collEntry->routingInfo = std::move(newRoutingInfo); - }; - - const ChunkVersion startingCollectionVersion = - (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED()); - - LOGV2_FOR_CATALOG_REFRESH( - 24106, - 1, - "Refreshing cached collection {namespace} with version {currentCollectionVersion}", - "namespace"_attr = nss, - "currentCollectionVersion"_attr = startingCollectionVersion); - - _cacheLoader.getChunksSince(nss, startingCollectionVersion) - .thenRunOn(_executor) - .getAsync(refreshCallback); - - // The routing info for this collection shouldn't change, as other threads may try to use the - // CatalogCache while we are waiting for the refresh to complete. - invariant(collEntry->routingInfo.get() == existingRoutingInfo.get()); -} - -void CatalogCache::_createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss) { - stdx::lock_guard<Latch> lg(_mutex); - auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss); - collRoutingInfoEntry->needsRefresh = true; - collRoutingInfoEntry->epochHasChanged = true; -} - -void CatalogCache::_createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss, - const ShardId& staleShardId) { - stdx::lock_guard<Latch> lg(_mutex); - auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss); - collRoutingInfoEntry->needsRefresh = true; - if (collRoutingInfoEntry->routingInfo) { - collRoutingInfoEntry->routingInfo->setShardStale(staleShardId); - } -} - -void CatalogCache::_createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss) { - stdx::lock_guard<Latch> lg(_mutex); - auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss); - collRoutingInfoEntry->needsRefresh = true; -} - -std::shared_ptr<CatalogCache::CollectionRoutingInfoEntry> CatalogCache::_createOrGetCollectionEntry( - WithLock wl, const NamespaceString& nss) { - auto& collectionsForDb = _collectionsByDb[nss.db()]; - if (!collectionsForDb.contains(nss.ns())) { - // TODO SERVER-46199: ensure collections cache size is capped - // currently no routine except for dropDatabase is removing cached collection entries and - // the cache for a specific DB can grow indefinitely. - collectionsForDb[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>(); - } - - return collectionsForDb[nss.ns()]; +void CatalogCache::invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss) { + _collectionCache.invalidate(nss); } void CatalogCache::Stats::report(BSONObjBuilder* builder) const { @@ -708,14 +393,6 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const { builder->append("totalRefreshWaitTimeMicros", totalRefreshWaitTimeMicros.load()); - builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load()); - builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load()); - - builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load()); - builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load()); - - builder->append("countFailedRefreshes", countFailedRefreshes.load()); - if (isMongos()) { BSONObjBuilder operationsBlockedByRefreshBuilder( builder->subobjStart("operationsBlockedByRefresh")); @@ -756,7 +433,6 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa OperationContext* opCtx, const std::string& dbName, const ComparableDatabaseVersion& previousDbVersion) { - // TODO (SERVER-34164): Track and increment stats for database refreshes LOGV2_FOR_CATALOG_REFRESH(24102, 2, "Refreshing cached database entry", "db"_attr = dbName); @@ -788,73 +464,199 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa } } -AtomicWord<uint64_t> ComparableDatabaseVersion::_localSequenceNumSource{1ULL}; +CatalogCache::CollectionCache::CollectionCache(ServiceContext* service, + ThreadPoolInterface& threadPool, + CatalogCacheLoader& catalogCacheLoader) + : ReadThroughCache(_mutex, + service, + threadPool, + [this](OperationContext* opCtx, + const NamespaceString& nss, + const ValueHandle& collectionHistory, + const ComparableChunkVersion& previousChunkVersion) { + return _lookupCollection( + opCtx, nss, collectionHistory, previousChunkVersion); + }, + kCollectionCacheSize), + _catalogCacheLoader(catalogCacheLoader) {} -ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion( - const DatabaseVersion& version) { - return ComparableDatabaseVersion(version, _localSequenceNumSource.fetchAndAdd(1)); +void CatalogCache::CollectionCache::reportStats(BSONObjBuilder* builder) const { + _stats.report(builder); } -const DatabaseVersion& ComparableDatabaseVersion::getVersion() const { - return _dbVersion; +void CatalogCache::CollectionCache::_updateRefreshesStats(const bool isIncremental, + const bool add) { + if (add) { + if (isIncremental) { + _stats.numActiveIncrementalRefreshes.addAndFetch(1); + _stats.countIncrementalRefreshesStarted.addAndFetch(1); + } else { + _stats.numActiveFullRefreshes.addAndFetch(1); + _stats.countFullRefreshesStarted.addAndFetch(1); + } + } else { + if (isIncremental) { + _stats.numActiveIncrementalRefreshes.subtractAndFetch(1); + } else { + _stats.numActiveFullRefreshes.subtractAndFetch(1); + } + } } -uint64_t ComparableDatabaseVersion::getLocalSequenceNum() const { - return _localSequenceNum; -} +void CatalogCache::CollectionCache::Stats::report(BSONObjBuilder* builder) const { + builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load()); + builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load()); -BSONObj ComparableDatabaseVersion::toBSON() const { - BSONObjBuilder builder; - _dbVersion.getUuid().appendToBuilder(&builder, "uuid"); - builder.append("lastMod", _dbVersion.getLastMod()); - builder.append("localSequenceNum", std::to_string(_localSequenceNum)); - return builder.obj(); -} + builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load()); + builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load()); -std::string ComparableDatabaseVersion::toString() const { - return toBSON().toString(); + builder->append("countFailedRefreshes", countFailedRefreshes.load()); } +CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_lookupCollection( + OperationContext* opCtx, + const NamespaceString& nss, + const RoutingTableHistoryValueHandle& existingHistory, + const ComparableChunkVersion& previousVersion) { + const bool isIncremental(existingHistory && existingHistory->optRt); + _updateRefreshesStats(isIncremental, true); -CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard) - : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {} + Timer t{}; + try { + auto lookupVersion = + isIncremental ? existingHistory->optRt->getVersion() : ChunkVersion::UNSHARDED(); -const ShardId& CachedDatabaseInfo::primaryId() const { - return _dbt.getPrimary(); + LOGV2_FOR_CATALOG_REFRESH(4619900, + 1, + "Refreshing cached collection", + "namespace"_attr = nss, + "currentVersion"_attr = previousVersion); + + auto collectionAndChunks = _catalogCacheLoader.getChunksSince(nss, lookupVersion).get(); + + auto newRoutingHistory = [&] { + // If we have routing info already and it's for the same collection epoch, we're + // updating. Otherwise, we're making a whole new routing table. + if (isIncremental && + existingHistory->optRt->getVersion().epoch() == collectionAndChunks.epoch) { + return existingHistory->optRt->makeUpdated(collectionAndChunks.reshardingFields, + collectionAndChunks.changedChunks); + } + + auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> { + if (!collectionAndChunks.defaultCollation.isEmpty()) { + // The collation should have been validated upon collection creation + return uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(collectionAndChunks.defaultCollation)); + } + return nullptr; + }(); + + return RoutingTableHistory::makeNew(nss, + collectionAndChunks.uuid, + KeyPattern(collectionAndChunks.shardKeyPattern), + std::move(defaultCollator), + collectionAndChunks.shardKeyIsUnique, + collectionAndChunks.epoch, + std::move(collectionAndChunks.reshardingFields), + collectionAndChunks.changedChunks); + }(); + + newRoutingHistory.setAllShardsRefreshed(); + + // Check that the shards all match with what is on the config server + std::set<ShardId> shardIds; + newRoutingHistory.getAllShardIds(&shardIds); + for (const auto& shardId : shardIds) { + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); + } + + const auto newVersion = + ComparableChunkVersion::makeComparableChunkVersion(newRoutingHistory.getVersion()); + + LOGV2_FOR_CATALOG_REFRESH(4619901, + isIncremental || newVersion != previousVersion ? 0 : 1, + "Refreshed cached collection", + "namespace"_attr = nss, + "newVersion"_attr = newVersion, + "oldVersion"_attr = previousVersion, + "duration"_attr = Milliseconds(t.millis())); + _updateRefreshesStats(isIncremental, false); + + return LookupResult(OptionalRoutingTableHistory(std::move(newRoutingHistory)), newVersion); + } catch (const DBException& ex) { + _stats.countFailedRefreshes.addAndFetch(1); + _updateRefreshesStats(isIncremental, false); + + if (ex.code() == ErrorCodes::NamespaceNotFound) { + LOGV2_FOR_CATALOG_REFRESH(4619902, + 0, + "Collection has found to be unsharded after refresh", + "namespace"_attr = nss, + "duration"_attr = Milliseconds(t.millis())); + + return LookupResult( + OptionalRoutingTableHistory(), + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED())); + } + + LOGV2_FOR_CATALOG_REFRESH(4619903, + 0, + "Error refreshing cached collection", + "namespace"_attr = nss, + "duration"_attr = Milliseconds(t.millis()), + "error"_attr = redact(ex)); + + throw; + } } -bool CachedDatabaseInfo::shardingEnabled() const { - return _dbt.getSharded(); +AtomicWord<uint64_t> ComparableDatabaseVersion::_uuidDisambiguatingSequenceNumSource{1ULL}; + +ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion( + const DatabaseVersion& version) { + return ComparableDatabaseVersion(version, _uuidDisambiguatingSequenceNumSource.fetchAndAdd(1)); } -DatabaseVersion CachedDatabaseInfo::databaseVersion() const { - return _dbt.getVersion(); +std::string ComparableDatabaseVersion::toString() const { + return str::stream() << (_dbVersion ? _dbVersion->toBSON().toString() : "NONE") << "|" + << _uuidDisambiguatingSequenceNum; } -AtomicWord<uint64_t> ComparableChunkVersion::_localSequenceNumSource{1ULL}; +bool ComparableDatabaseVersion::operator==(const ComparableDatabaseVersion& other) const { + if (!_dbVersion && !other._dbVersion) + return true; // Default constructed value + if (_dbVersion.is_initialized() != other._dbVersion.is_initialized()) + return false; // One side is default constructed value -ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion( - const ChunkVersion& version) { - return ComparableChunkVersion(version, _localSequenceNumSource.fetchAndAdd(1)); + return sameUuid(other) && (_dbVersion->getLastMod() == other._dbVersion->getLastMod()); } -const ChunkVersion& ComparableChunkVersion::getVersion() const { - return _chunkVersion; +bool ComparableDatabaseVersion::operator<(const ComparableDatabaseVersion& other) const { + if (!_dbVersion && !other._dbVersion) + return false; // Default constructed value + + if (_dbVersion && other._dbVersion && sameUuid(other)) { + return _dbVersion->getLastMod() < other._dbVersion->getLastMod(); + } else { + return _uuidDisambiguatingSequenceNum < other._uuidDisambiguatingSequenceNum; + } } -uint64_t ComparableChunkVersion::getLocalSequenceNum() const { - return _localSequenceNum; +CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard) + : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {} + +const ShardId& CachedDatabaseInfo::primaryId() const { + return _dbt.getPrimary(); } -BSONObj ComparableChunkVersion::toBSON() const { - BSONObjBuilder builder; - _chunkVersion.appendToCommand(&builder); - builder.append("localSequenceNum", std::to_string(_localSequenceNum)); - return builder.obj(); +bool CachedDatabaseInfo::shardingEnabled() const { + return _dbt.getSharded(); } -std::string ComparableChunkVersion::toString() const { - return toBSON().toString(); +DatabaseVersion CachedDatabaseInfo::databaseVersion() const { + return _dbt.getVersion(); } } // namespace mongo diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index a957189183a..796b9e10136 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -45,8 +45,6 @@ namespace mongo { class BSONObjBuilder; -class CachedDatabaseInfo; -class OperationContext; static constexpr int kMaxNumStaleVersionRetries = 10; @@ -64,21 +62,21 @@ extern const OperationContext::Decoration<bool> operationShouldBlockBehindCatalo * in fact is impossible to compare two different DatabaseVersion that have different UUIDs. * * This class wrap a DatabaseVersion object to make it always comparable by timestamping it with a - * node-local sequence number (_dbVersionLocalSequence). + * node-local sequence number (_uuidDisambiguatingSequenceNum). * * This class class should go away once a cluster-wide comparable DatabaseVersion will be * implemented. */ class ComparableDatabaseVersion { public: - /* - * Create a ComparableDatabaseVersion that wraps the given DatabaseVersion. - * Each object created through this method will have a local sequence number grater then the + /** + * Creates a ComparableDatabaseVersion that wraps the given DatabaseVersion. + * Each object created through this method will have a local sequence number greater than the * previously created ones. */ static ComparableDatabaseVersion makeComparableDatabaseVersion(const DatabaseVersion& version); - /* + /** * Empty constructor needed by the ReadThroughCache. * * Instances created through this constructor will be always less then the ones created through @@ -86,39 +84,28 @@ public: */ ComparableDatabaseVersion() = default; - const DatabaseVersion& getVersion() const; - - uint64_t getLocalSequenceNum() const; - - BSONObj toBSON() const; + const DatabaseVersion& getVersion() const { + return *_dbVersion; + } std::string toString() const; - // Rerturns true if the two versions have the same UUID bool sameUuid(const ComparableDatabaseVersion& other) const { - return _dbVersion.getUuid() == other._dbVersion.getUuid(); + return _dbVersion->getUuid() == other._dbVersion->getUuid(); } - bool operator==(const ComparableDatabaseVersion& other) const { - return sameUuid(other) && (_dbVersion.getLastMod() == other._dbVersion.getLastMod()); - } + bool operator==(const ComparableDatabaseVersion& other) const; bool operator!=(const ComparableDatabaseVersion& other) const { return !(*this == other); } - /* - * In the case the two compared instances have different UUIDs the most recently created one - * will be grater, otherwise the comparision will be driven by the lastMod field of the - * underlying DatabaseVersion. + /** + * In case the two compared instances have different UUIDs, the most recently created one will + * be greater, otherwise the comparison will be driven by the lastMod field of the underlying + * DatabaseVersion. */ - bool operator<(const ComparableDatabaseVersion& other) const { - if (sameUuid(other)) { - return _dbVersion.getLastMod() < other._dbVersion.getLastMod(); - } else { - return _localSequenceNum < other._localSequenceNum; - } - } + bool operator<(const ComparableDatabaseVersion& other) const; bool operator>(const ComparableDatabaseVersion& other) const { return other < *this; @@ -133,92 +120,18 @@ public: } private: - static AtomicWord<uint64_t> _localSequenceNumSource; + static AtomicWord<uint64_t> _uuidDisambiguatingSequenceNumSource; + + ComparableDatabaseVersion(const DatabaseVersion& version, + uint64_t uuidDisambiguatingSequenceNum) + : _dbVersion(version), _uuidDisambiguatingSequenceNum(uuidDisambiguatingSequenceNum) {} - ComparableDatabaseVersion(const DatabaseVersion& version, uint64_t localSequenceNum) - : _dbVersion(version), _localSequenceNum(localSequenceNum) {} + boost::optional<DatabaseVersion> _dbVersion; - DatabaseVersion _dbVersion; // Locally incremented sequence number that allows to compare two database versions with // different UUIDs. Each new comparableDatabaseVersion will have a greater sequence number then // the ones created before. - uint64_t _localSequenceNum{0}; -}; - -/** - * Constructed to be used exclusively by the CatalogCache as a vector clock (Time) to drive - * CollectionCache's lookups. - * - * The ChunkVersion class contains an non comparable epoch, which makes impossible to compare two - * ChunkVersions when their epochs's differ. - * - * This class wraps a ChunkVersion object with a node-local sequence number (_localSequenceNum) that - * allows the comparision. - * - * This class should go away once a cluster-wide comparable ChunkVersion is implemented. - */ -class ComparableChunkVersion { -public: - static ComparableChunkVersion makeComparableChunkVersion(const ChunkVersion& version); - - ComparableChunkVersion() = default; - - const ChunkVersion& getVersion() const; - - uint64_t getLocalSequenceNum() const; - - BSONObj toBSON() const; - - std::string toString() const; - - bool sameEpoch(const ComparableChunkVersion& other) const { - return _chunkVersion.epoch() == other._chunkVersion.epoch(); - } - - bool operator==(const ComparableChunkVersion& other) const { - return sameEpoch(other) && - (_chunkVersion.majorVersion() == other._chunkVersion.majorVersion() && - _chunkVersion.minorVersion() == other._chunkVersion.minorVersion()); - } - - bool operator!=(const ComparableChunkVersion& other) const { - return !(*this == other); - } - - bool operator<(const ComparableChunkVersion& other) const { - if (sameEpoch(other)) { - return _chunkVersion.majorVersion() < other._chunkVersion.majorVersion() || - (_chunkVersion.majorVersion() == other._chunkVersion.majorVersion() && - _chunkVersion.minorVersion() < other._chunkVersion.minorVersion()); - } else { - return _localSequenceNum < other._localSequenceNum; - } - } - - bool operator>(const ComparableChunkVersion& other) const { - return other < *this; - } - - bool operator<=(const ComparableChunkVersion& other) const { - return !(*this > other); - } - - bool operator>=(const ComparableChunkVersion& other) const { - return !(*this < other); - } - -private: - static AtomicWord<uint64_t> _localSequenceNumSource; - - ComparableChunkVersion(const ChunkVersion& version, uint64_t localSequenceNum) - : _chunkVersion(version), _localSequenceNum(localSequenceNum) {} - - ChunkVersion _chunkVersion; - - // Locally incremented sequence number that allows to compare two colection versions with - // different epochs. Each new comparableChunkVersion will have a greater sequence number than - // the ones created before. - uint64_t _localSequenceNum{0}; + uint64_t _uuidDisambiguatingSequenceNum{0}; }; /** @@ -298,21 +211,9 @@ public: /** * Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed. - * - * When forceRefreshFromThisThread is false, it's possible for this call to - * join an ongoing refresh from another thread forceRefreshFromThisThread. - * forceRefreshFromThisThread checks whether it joined another thread and - * then forces it to try again, which is necessary in cases where calls to - * getCollectionRoutingInfoWithRefresh must be causally consistent - * - * TODO: Remove this parameter in favor of using collection creation time + - * collection version to decide when a refresh is necessary and provide - * proper causal consistency */ - StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh( - OperationContext* opCtx, - const NamespaceString& nss, - bool forceRefreshFromThisThread = false); + StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh(OperationContext* opCtx, + const NamespaceString& nss); /** * Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a @@ -333,11 +234,6 @@ public: const boost::optional<DatabaseVersion>& wantedVersion); /** - * Gets whether this operation should block behind a catalog cache refresh. - */ - static bool getOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx); - - /** * Sets whether this operation should block behind a catalog cache refresh. */ static void setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx, @@ -349,18 +245,9 @@ public: * requests to block on an upcoming catalog cache refresh. */ void invalidateShardOrEntireCollectionEntryForShardedCollection( - OperationContext* opCtx, const NamespaceString& nss, - boost::optional<ChunkVersion> wantedVersion, - const ChunkVersion& receivedVersion, - ShardId shardId); - - /** - * Non-blocking method that marks the current collection entry for the namespace as needing - * refresh due to an epoch change. Will cause all further targetting attempts for this - * namespace to block on a catalog cache refresh. - */ - void onEpochChange(const NamespaceString& nss); + const boost::optional<ChunkVersion>& wantedVersion, + const ShardId& shardId); /** * Throws a StaleConfigException if this catalog cache does not have an entry for the given @@ -370,16 +257,8 @@ public: * version to throw a StaleConfigException. */ void checkEpochOrThrow(const NamespaceString& nss, - ChunkVersion targetCollectionVersion, - const ShardId& shardId) const; - - /** - * Non-blocking method, which invalidates the shard for the routing table for the specified - * namespace. If that shard is targetted in the future, getCollectionRoutingInfo will wait on a - * refresh. - */ - void invalidateShardForShardedCollection(const NamespaceString& nss, - const ShardId& staleShardId); + const ChunkVersion& targetCollectionVersion, + const ShardId& shardId); /** * Non-blocking method, which invalidates all namespaces which contain data on the specified @@ -388,12 +267,6 @@ public: void invalidateEntriesThatReferenceShard(const ShardId& shardId); /** - * Non-blocking method, which removes the entire specified collection from the cache (resulting - * in full refresh on subsequent access) - */ - void purgeCollection(const NamespaceString& nss); - - /** * Non-blocking method, which removes the entire specified database (including its collections) * from the cache. */ @@ -416,35 +289,17 @@ public: */ void checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, mongo::LogicalOp opType); + /** + * Non-blocking method that marks the current collection entry for the namespace as needing + * refresh. Will cause all further targetting attempts to block on a catalog cache refresh, + * even if they do not require causal consistency. + */ + void invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss); + private: // Make the cache entries friends so they can access the private classes below friend class CachedDatabaseInfo; - /** - * Cache entry describing a collection. - */ - struct CollectionRoutingInfoEntry { - CollectionRoutingInfoEntry() = default; - // Disable copy (and move) semantics - CollectionRoutingInfoEntry(const CollectionRoutingInfoEntry&) = delete; - CollectionRoutingInfoEntry& operator=(const CollectionRoutingInfoEntry&) = delete; - - // Specifies whether this cache entry needs a refresh (in which case routingInfo should not - // be relied on) or it doesn't, in which case there should be a non-null routingInfo. - bool needsRefresh{true}; - - // Specifies whether the namespace has had an epoch change, which indicates that every - // shard should block on an upcoming refresh. - bool epochHasChanged{true}; - - // Contains a notification to be waited on for the refresh to complete (only available if - // needsRefresh is true) - std::shared_ptr<Notification<Status>> refreshCompletionNotification; - - // Contains the cached routing information (only available if needsRefresh is false) - std::shared_ptr<RoutingTableHistory> routingInfo; - }; - class DatabaseCache : public ReadThroughCache<std::string, DatabaseType, ComparableDatabaseVersion> { public: @@ -461,88 +316,54 @@ private: Mutex _mutex = MONGO_MAKE_LATCH("DatabaseCache::_mutex"); }; - /** - * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The - * namespace must be in the 'needRefresh' state. - */ - void _scheduleCollectionRefresh(WithLock, - ServiceContext* service, - std::shared_ptr<CollectionRoutingInfoEntry> collEntry, - NamespaceString const& nss, - int refreshAttempt); + class CollectionCache : public RoutingTableHistoryCache { + public: + CollectionCache(ServiceContext* service, + ThreadPoolInterface& threadPool, + CatalogCacheLoader& catalogCacheLoader); - /** - * Marks a collection entry as needing refresh. Will create the collection entry if one does - * not exist. Also marks the epoch as changed, which will cause all further targetting requests - * against this namespace to block upon a catalog cache refresh. - */ - void _createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss); + void reportStats(BSONObjBuilder* builder) const; - /** - * Marks a collection entry as needing refresh. Will create the collection entry if one does - * not exist. Will mark the given shard ID as stale, which will cause all further targetting - * requests for the given shard for this namespace to block upon a catalog cache refresh. - */ - void _createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss, - const ShardId& shardId); + private: + LookupResult _lookupCollection(OperationContext* opCtx, + const NamespaceString& nss, + const ValueHandle& collectionHistory, + const ComparableChunkVersion& previousChunkVersion); - /** - * Marks a collection entry as needing refresh. Will create the collection entry if one does - * not exist. - */ - void _createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss); + CatalogCacheLoader& _catalogCacheLoader; + Mutex _mutex = MONGO_MAKE_LATCH("CollectionCache::_mutex"); - /** - * Retrieves the collection entry for the given namespace, creating the entry if one does not - * already exist. - */ - std::shared_ptr<CollectionRoutingInfoEntry> _createOrGetCollectionEntry( - WithLock wl, const NamespaceString& nss); + struct Stats { + // Tracks how many incremental refreshes are waiting to complete currently + AtomicWord<long long> numActiveIncrementalRefreshes{0}; - /** - * Used as a flag to indicate whether or not this thread performed its own - * refresh for certain helper functions - * - * kPerformedRefresh is used only when the calling thread performed the - * refresh *itself* - * - * kDidNotPerformRefresh is used either when there was an error or when - * this thread joined an ongoing refresh - */ - enum class RefreshAction { - kPerformedRefresh, - kDidNotPerformRefresh, - }; + // Cumulative, always-increasing counter of how many incremental refreshes have been + // kicked off + AtomicWord<long long> countIncrementalRefreshesStarted{0}; - /** - * Return type for helper functions performing refreshes so that they can - * indicate both status and whether or not this thread performed its own - * refresh - */ - struct RefreshResult { - // Status containing result of refresh - StatusWith<ChunkManager> statusWithInfo; - RefreshAction actionTaken; - }; + // Tracks how many full refreshes are waiting to complete currently + AtomicWord<long long> numActiveFullRefreshes{0}; - /** - * Retrieves the collection routing info for this namespace after blocking on a catalog cache - * refresh. - */ - CatalogCache::RefreshResult _getCollectionRoutingInfoWithForcedRefresh( - OperationContext* opctx, const NamespaceString& nss); + // Cumulative, always-increasing counter of how many full refreshes have been kicked off + AtomicWord<long long> countFullRefreshesStarted{0}; - /** - * Helper function used when we need the refresh action taken (e.g. when we - * want to force refresh) - */ - CatalogCache::RefreshResult _getCollectionRoutingInfo(OperationContext* opCtx, - const NamespaceString& nss); + // Cumulative, always-increasing counter of how many full or incremental refreshes + // failed for whatever reason + AtomicWord<long long> countFailedRefreshes{0}; - CatalogCache::RefreshResult _getCollectionRoutingInfoAt( - OperationContext* opCtx, - const NamespaceString& nss, - boost::optional<Timestamp> atClusterTime); + /** + * Reports the accumulated statistics for serverStatus. + */ + void report(BSONObjBuilder* builder) const; + + } _stats; + + void _updateRefreshesStats(const bool isIncremental, const bool add); + }; + + StatusWith<ChunkManager> _getCollectionRoutingInfoAt(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<Timestamp> atClusterTime); // Interface from which chunks will be retrieved CatalogCacheLoader& _cacheLoader; @@ -557,23 +378,6 @@ private: // combined AtomicWord<long long> totalRefreshWaitTimeMicros{0}; - // Tracks how many incremental refreshes are waiting to complete currently - AtomicWord<long long> numActiveIncrementalRefreshes{0}; - - // Cumulative, always-increasing counter of how many incremental refreshes have been kicked - // off - AtomicWord<long long> countIncrementalRefreshesStarted{0}; - - // Tracks how many full refreshes are waiting to complete currently - AtomicWord<long long> numActiveFullRefreshes{0}; - - // Cumulative, always-increasing counter of how many full refreshes have been kicked off - AtomicWord<long long> countFullRefreshesStarted{0}; - - // Cumulative, always-increasing counter of how many full or incremental refreshes failed - // for whatever reason - AtomicWord<long long> countFailedRefreshes{0}; - // Cumulative, always-increasing counter of how many operations have been blocked by a // catalog cache refresh. Broken down by operation type to match the operations tracked // by the OpCounters class. @@ -595,15 +399,9 @@ private: std::shared_ptr<ThreadPool> _executor; - DatabaseCache _databaseCache; - // Mutex to serialize access to the collection cache - mutable Mutex _mutex = MONGO_MAKE_LATCH("CatalogCache::_mutex"); - // Map from full collection name to the routing info for that collection, grouped by database - using CollectionInfoMap = StringMap<std::shared_ptr<CollectionRoutingInfoEntry>>; - using CollectionsByDbMap = StringMap<CollectionInfoMap>; - CollectionsByDbMap _collectionsByDb; + CollectionCache _collectionCache; }; } // namespace mongo diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp index 70b56845eb1..1e21135a15b 100644 --- a/src/mongo/s/catalog_cache_refresh_test.cpp +++ b/src/mongo/s/catalog_cache_refresh_test.cpp @@ -440,7 +440,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) { ASSERT_EQ(1, initialRoutingInfo.numChunks()); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); const auto incompleteChunks = [&]() { ChunkVersion version(1, 0, epoch); @@ -497,7 +497,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) { ASSERT_EQ(1, initialRoutingInfo.numChunks()); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); const auto incompleteChunks = [&]() { ChunkVersion version(1, 0, epoch); @@ -551,7 +551,7 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) { auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {})); ASSERT_EQ(1, initialRoutingInfo.numChunks()); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); ChunkVersion version = initialRoutingInfo.getVersion(); @@ -598,7 +598,7 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft setupNShards(2); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); ChunkVersion oldVersion = initialRoutingInfo.getVersion(); const OID newEpoch = OID::gen(); @@ -683,7 +683,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) { setupNShards(2); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); ChunkVersion newVersion(1, 0, OID::gen()); @@ -730,7 +730,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) { ChunkVersion version = initialRoutingInfo.getVersion(); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); expectGetCollection(version.epoch(), shardKeyPattern); @@ -776,7 +776,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveWithReshardingFieldsAdde ChunkVersion version = initialRoutingInfo.getVersion(); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); ChunkVersion expectedDestShardVersion; @@ -824,7 +824,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunkWithReshardingF ChunkVersion version = initialRoutingInfo.getVersion(); - auto future = scheduleRoutingInfoForcedRefresh(kNss); + auto future = scheduleRoutingInfoIncrementalRefresh(kNss); // The collection type won't have resharding fields this time. expectGetCollection(version.epoch(), shardKeyPattern); diff --git a/src/mongo/s/catalog_cache_test.cpp b/src/mongo/s/catalog_cache_test.cpp index fce177bdd4f..8fdb461aca3 100644 --- a/src/mongo/s/catalog_cache_test.cpp +++ b/src/mongo/s/catalog_cache_test.cpp @@ -35,6 +35,7 @@ #include "mongo/s/catalog_cache.h" #include "mongo/s/catalog_cache_loader_mock.h" #include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/s/stale_exception.h" namespace mongo { namespace { @@ -72,7 +73,54 @@ protected: _catalogCacheLoader->setDatabaseRefreshReturnValue(kErrorStatus); } + void loadCollection(const ChunkVersion& version) { + const auto coll = makeCollectionType(version); + _catalogCacheLoader->setCollectionRefreshReturnValue(coll); + _catalogCacheLoader->setChunkRefreshReturnValue(makeChunks(version)); + + const auto swChunkManager = + _catalogCache->getCollectionRoutingInfo(operationContext(), coll.getNs()); + ASSERT_OK(swChunkManager.getStatus()); + + // Reset the loader return values to avoid false positive results + _catalogCacheLoader->setCollectionRefreshReturnValue(kErrorStatus); + _catalogCacheLoader->setChunkRefreshReturnValue(kErrorStatus); + } + + void loadUnshardedCollection(const NamespaceString& nss) { + _catalogCacheLoader->setCollectionRefreshReturnValue( + Status(ErrorCodes::NamespaceNotFound, "collection not found")); + + const auto swChunkManager = + _catalogCache->getCollectionRoutingInfo(operationContext(), nss); + ASSERT_OK(swChunkManager.getStatus()); + + // Reset the loader return value to avoid false positive results + _catalogCacheLoader->setCollectionRefreshReturnValue(kErrorStatus); + } + + std::vector<ChunkType> makeChunks(ChunkVersion version) { + ChunkType chunk(kNss, + {kShardKeyPattern.getKeyPattern().globalMin(), + kShardKeyPattern.getKeyPattern().globalMax()}, + version, + {"0"}); + chunk.setName(OID::gen()); + return {chunk}; + } + + CollectionType makeCollectionType(const ChunkVersion& collVersion) { + CollectionType coll; + coll.setNs(kNss); + coll.setEpoch(collVersion.epoch()); + coll.setKeyPattern(kShardKeyPattern.getKeyPattern()); + coll.setUnique(false); + return coll; + } + const NamespaceString kNss{"catalgoCacheTestDB.foo"}; + const std::string kPattern{"_id"}; + const ShardKeyPattern kShardKeyPattern{BSON(kPattern << 1)}; const int kDummyPort{12345}; const HostAndPort kConfigHostAndPort{"DummyConfig", kDummyPort}; const std::vector<ShardId> kShards{{"0"}, {"1"}}; @@ -129,5 +177,86 @@ TEST_F(CatalogCacheTest, InvalidateSingleDbOnShardRemoval) { ASSERT_EQ(cachedDb.primaryId(), kShards[1]); } +TEST_F(CatalogCacheTest, CheckEpochNoDatabase) { + const auto collVersion = ChunkVersion(1, 0, OID::gen()); + ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]), + StaleConfigException, + [&](const StaleConfigException& ex) { + const auto staleInfo = ex.extraInfo<StaleConfigInfo>(); + ASSERT(staleInfo); + ASSERT_EQ(staleInfo->getNss(), kNss); + ASSERT_EQ(staleInfo->getVersionReceived(), collVersion); + ASSERT_EQ(staleInfo->getShardId(), kShards[0]); + ASSERT(staleInfo->getVersionWanted() == boost::none); + }); +} + +TEST_F(CatalogCacheTest, CheckEpochNoCollection) { + const auto dbVersion = DatabaseVersion(); + const auto collVersion = ChunkVersion(1, 0, OID::gen()); + + loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)}); + ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]), + StaleConfigException, + [&](const StaleConfigException& ex) { + const auto staleInfo = ex.extraInfo<StaleConfigInfo>(); + ASSERT(staleInfo); + ASSERT_EQ(staleInfo->getNss(), kNss); + ASSERT_EQ(staleInfo->getVersionReceived(), collVersion); + ASSERT_EQ(staleInfo->getShardId(), kShards[0]); + ASSERT(staleInfo->getVersionWanted() == boost::none); + }); +} + +TEST_F(CatalogCacheTest, CheckEpochUnshardedCollection) { + const auto dbVersion = DatabaseVersion(); + const auto collVersion = ChunkVersion(1, 0, OID::gen()); + + loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)}); + loadUnshardedCollection(kNss); + ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]), + StaleConfigException, + [&](const StaleConfigException& ex) { + const auto staleInfo = ex.extraInfo<StaleConfigInfo>(); + ASSERT(staleInfo); + ASSERT_EQ(staleInfo->getNss(), kNss); + ASSERT_EQ(staleInfo->getVersionReceived(), collVersion); + ASSERT_EQ(staleInfo->getShardId(), kShards[0]); + ASSERT(staleInfo->getVersionWanted() == boost::none); + }); +} + +TEST_F(CatalogCacheTest, CheckEpochWithMismatch) { + const auto dbVersion = DatabaseVersion(); + const auto wantedCollVersion = ChunkVersion(1, 0, OID::gen()); + const auto receivedCollVersion = ChunkVersion(1, 0, OID::gen()); + + loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)}); + loadCollection(wantedCollVersion); + + ASSERT_THROWS_WITH_CHECK( + _catalogCache->checkEpochOrThrow(kNss, receivedCollVersion, kShards[0]), + StaleConfigException, + [&](const StaleConfigException& ex) { + const auto staleInfo = ex.extraInfo<StaleConfigInfo>(); + ASSERT(staleInfo); + ASSERT_EQ(staleInfo->getNss(), kNss); + ASSERT_EQ(staleInfo->getVersionReceived(), receivedCollVersion); + ASSERT(staleInfo->getVersionWanted() != boost::none); + ASSERT_EQ(*(staleInfo->getVersionWanted()), wantedCollVersion); + ASSERT_EQ(staleInfo->getShardId(), kShards[0]); + }); +} + +TEST_F(CatalogCacheTest, CheckEpochWithMatch) { + const auto dbVersion = DatabaseVersion(); + const auto collVersion = ChunkVersion(1, 0, OID::gen()); + + loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)}); + loadCollection(collVersion); + + _catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp index 71e02e67fac..4f59eeaef8a 100644 --- a/src/mongo/s/catalog_cache_test_fixture.cpp +++ b/src/mongo/s/catalog_cache_test_fixture.cpp @@ -81,6 +81,26 @@ CatalogCacheTestFixture::scheduleRoutingInfoUnforcedRefresh(const NamespaceStrin }); } +executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> +CatalogCacheTestFixture::scheduleRoutingInfoIncrementalRefresh(const NamespaceString& nss) { + auto catalogCache = Grid::get(getServiceContext())->catalogCache(); + const auto cm = + uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss)); + ASSERT(cm.isSharded()); + + // Simulates the shard wanting a higher version than the one sent by the router. + catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( + nss, boost::none, cm.dbPrimary()); + + return launchAsync([this, nss] { + auto client = getServiceContext()->makeClient("Test"); + auto const catalogCache = Grid::get(getServiceContext())->catalogCache(); + + return boost::make_optional( + uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss))); + }); +} + std::vector<ShardType> CatalogCacheTestFixture::setupNShards(int numShards) { std::vector<ShardType> shards; for (int i = 0; i < numShards; i++) { diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h index fb5238a2ba9..3d58f6a8557 100644 --- a/src/mongo/s/catalog_cache_test_fixture.h +++ b/src/mongo/s/catalog_cache_test_fixture.h @@ -84,6 +84,17 @@ protected: scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss); /** + * Advance the time in the cache for 'kNss' and schedules a thread to make an incremental + * refresh. + * + * NOTE: The returned value is always set. The reason to use optional is a deficiency of + * std::future with the MSVC STL library, which requires the templated type to be default + * constructible. + */ + executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>> + scheduleRoutingInfoIncrementalRefresh(const NamespaceString& nss); + + /** * Ensures that there are 'numShards' available in the shard registry. The shard ids are * generated as "0", "1", etc. * diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 5713855e01f..9ded562066c 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -336,22 +336,23 @@ void RoutingTableHistory::setAllShardsRefreshed() { } Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj& collation) const { - const bool hasSimpleCollation = (collation.isEmpty() && !_rt->getDefaultCollator()) || + const bool hasSimpleCollation = (collation.isEmpty() && !_rt->optRt->getDefaultCollator()) || SimpleBSONObjComparator::kInstance.evaluate(collation == CollationSpec::kSimpleSpec); if (!hasSimpleCollation) { for (BSONElement elt : shardKey) { uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard due to collation of key " - << elt.fieldNameStringData() << " for namespace " << _rt->nss(), + << elt.fieldNameStringData() << " for namespace " + << _rt->optRt->nss(), !CollationIndexKey::isCollatableType(elt.type())); } } - auto chunkInfo = _rt->findIntersectingChunk(shardKey); + auto chunkInfo = _rt->optRt->findIntersectingChunk(shardKey); uassert(ErrorCodes::ShardKeyNotFound, str::stream() << "Cannot target single shard using key " << shardKey - << " for namespace " << _rt->nss(), + << " for namespace " << _rt->optRt->nss(), chunkInfo && chunkInfo->containsKey(shardKey)); return Chunk(*chunkInfo, _clusterTime); @@ -361,7 +362,7 @@ bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& sha if (shardKey.isEmpty()) return false; - auto chunkInfo = _rt->findIntersectingChunk(shardKey); + auto chunkInfo = _rt->optRt->findIntersectingChunk(shardKey); if (!chunkInfo) return false; @@ -374,7 +375,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e const BSONObj& query, const BSONObj& collation, std::set<ShardId>* shardIds) const { - auto qr = std::make_unique<QueryRequest>(_rt->nss()); + auto qr = std::make_unique<QueryRequest>(_rt->optRt->nss()); qr->setFilter(query); if (auto uuid = getUUID()) @@ -382,8 +383,8 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e if (!collation.isEmpty()) { qr->setCollation(collation); - } else if (_rt->getDefaultCollator()) { - auto defaultCollator = _rt->getDefaultCollator(); + } else if (_rt->optRt->getDefaultCollator()) { + auto defaultCollator = _rt->optRt->getDefaultCollator(); qr->setCollation(defaultCollator->getSpec().toBSON()); expCtx->setCollator(defaultCollator->clone()); } @@ -396,7 +397,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e MatchExpressionParser::kAllowAllSpecialFeatures)); // Fast path for targeting equalities on the shard key. - auto shardKeyToFind = _rt->getShardKeyPattern().extractShardKeyFromQuery(*cq); + auto shardKeyToFind = _rt->optRt->getShardKeyPattern().extractShardKeyFromQuery(*cq); if (!shardKeyToFind.isEmpty()) { try { auto chunk = findIntersectingChunk(shardKeyToFind, collation); @@ -413,14 +414,14 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e // Query { a : { $gte : 1, $lt : 2 }, // b : { $gte : 3, $lt : 4 } } // => Bounds { a : [1, 2), b : [3, 4) } - IndexBounds bounds = getIndexBoundsForQuery(_rt->getShardKeyPattern().toBSON(), *cq); + IndexBounds bounds = getIndexBoundsForQuery(_rt->optRt->getShardKeyPattern().toBSON(), *cq); // Transforms bounds for each shard key field into full shard key ranges // for example : // Key { a : 1, b : 1 } // Bounds { a : [1, 2), b : [3, 4) } // => Ranges { a : 1, b : 3 } => { a : 2, b : 4 } - BoundList ranges = _rt->getShardKeyPattern().flattenBounds(bounds); + BoundList ranges = _rt->optRt->getShardKeyPattern().flattenBounds(bounds); for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) { getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds); @@ -430,7 +431,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e // because _shardVersions contains shards with chunks and is built based on the last // refresh. Therefore, it is possible for _shardVersions to have fewer entries if a shard // no longer owns chunks when it used to at _clusterTime. - if (!_clusterTime && shardIds->size() == _rt->_shardVersions.size()) { + if (!_clusterTime && shardIds->size() == _rt->optRt->_shardVersions.size()) { break; } } @@ -439,7 +440,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e // For now, we satisfy that assumption by adding a shard with no matches rather than returning // an empty set of shards. if (shardIds->empty()) { - _rt->forEachChunk([&](const std::shared_ptr<ChunkInfo>& chunkInfo) { + _rt->optRt->forEachChunk([&](const std::shared_ptr<ChunkInfo>& chunkInfo) { shardIds->insert(chunkInfo->getShardIdAt(_clusterTime)); return false; }); @@ -459,7 +460,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min, return; } - _rt->forEachOverlappingChunk(min, max, true, [&](auto& chunkInfo) { + _rt->optRt->forEachOverlappingChunk(min, max, true, [&](auto& chunkInfo) { shardIds->insert(chunkInfo->getShardIdAt(_clusterTime)); // No need to iterate through the rest of the ranges, because we already know we need to use @@ -467,7 +468,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min, // because _shardVersions contains shards with chunks and is built based on the last // refresh. Therefore, it is possible for _shardVersions to have fewer entries if a shard // no longer owns chunks when it used to at _clusterTime. - if (!_clusterTime && shardIds->size() == _rt->_shardVersions.size()) { + if (!_clusterTime && shardIds->size() == _rt->optRt->_shardVersions.size()) { return false; } @@ -478,14 +479,15 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min, bool ChunkManager::rangeOverlapsShard(const ChunkRange& range, const ShardId& shardId) const { bool overlapFound = false; - _rt->forEachOverlappingChunk(range.getMin(), range.getMax(), false, [&](auto& chunkInfo) { - if (chunkInfo->getShardIdAt(_clusterTime) == shardId) { - overlapFound = true; - return false; - } + _rt->optRt->forEachOverlappingChunk( + range.getMin(), range.getMax(), false, [&](auto& chunkInfo) { + if (chunkInfo->getShardIdAt(_clusterTime) == shardId) { + overlapFound = true; + return false; + } - return true; - }); + return true; + }); return overlapFound; } @@ -494,7 +496,7 @@ boost::optional<Chunk> ChunkManager::getNextChunkOnShard(const BSONObj& shardKey const ShardId& shardId) const { boost::optional<Chunk> chunk; - _rt->forEachChunk( + _rt->optRt->forEachChunk( [&](auto& chunkInfo) { if (chunkInfo->getShardIdAt(_clusterTime) == shardId) { chunk.emplace(*chunkInfo, _clusterTime); @@ -654,7 +656,7 @@ ChunkManager ChunkManager::makeAtTime(const ChunkManager& cm, Timestamp clusterT } std::string ChunkManager::toString() const { - return _rt ? _rt->toString() : "UNSHARDED"; + return _rt->optRt ? _rt->optRt->toString() : "UNSHARDED"; } bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other, @@ -733,7 +735,7 @@ RoutingTableHistory RoutingTableHistory::makeUpdated( auto changedChunkInfos = flatten(changedChunks); auto chunkMap = _chunkMap.createMerged(changedChunkInfos); - // If at least one diff was applied, the collection's version must have advanced + // Only update the same collection. invariant(getVersion().epoch() == chunkMap.getVersion().epoch()); return RoutingTableHistory(_nss, @@ -745,4 +747,60 @@ RoutingTableHistory RoutingTableHistory::makeUpdated( std::move(chunkMap)); } +AtomicWord<uint64_t> ComparableChunkVersion::_epochDisambiguatingSequenceNumSource{1ULL}; +AtomicWord<uint64_t> ComparableChunkVersion::_forcedRefreshSequenceNumSource{1ULL}; + +ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion( + const ChunkVersion& version) { + return ComparableChunkVersion(_forcedRefreshSequenceNumSource.load(), + version, + _epochDisambiguatingSequenceNumSource.fetchAndAdd(1)); +} + +ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh() { + return ComparableChunkVersion(_forcedRefreshSequenceNumSource.addAndFetch(2) - 1, + boost::none, + _epochDisambiguatingSequenceNumSource.fetchAndAdd(1)); +} + +std::string ComparableChunkVersion::toString() const { + return str::stream() << _forcedRefreshSequenceNum << "|" + << (_chunkVersion ? _chunkVersion->toString() : "NONE") << "|" + << _epochDisambiguatingSequenceNum; +} + +bool ComparableChunkVersion::operator==(const ComparableChunkVersion& other) const { + if (_forcedRefreshSequenceNum == other._forcedRefreshSequenceNum) { + if (_forcedRefreshSequenceNum == 0) + return true; // Default constructed value + + if (sameEpoch(other)) { + if (_chunkVersion->majorVersion() == 0 && other._chunkVersion->majorVersion() == 0) { + return _chunkVersion->epoch() == OID(); + } + return _chunkVersion->majorVersion() == other._chunkVersion->majorVersion() && + _chunkVersion->minorVersion() == other._chunkVersion->minorVersion(); + } + } + return false; +} + +bool ComparableChunkVersion::operator<(const ComparableChunkVersion& other) const { + if (_forcedRefreshSequenceNum < other._forcedRefreshSequenceNum) + return true; + if (_forcedRefreshSequenceNum > other._forcedRefreshSequenceNum) + return false; + if (_forcedRefreshSequenceNum == 0) + return false; // Default constructed value + + if (sameEpoch(other) && other._chunkVersion->epoch() != OID() && + _chunkVersion->majorVersion() != 0 && other._chunkVersion->majorVersion() != 0) { + return _chunkVersion->majorVersion() < other._chunkVersion->majorVersion() || + (_chunkVersion->majorVersion() == other._chunkVersion->majorVersion() && + _chunkVersion->minorVersion() < other._chunkVersion->minorVersion()); + } else { + return _epochDisambiguatingSequenceNum < other._epochDisambiguatingSequenceNum; + } +} + } // namespace mongo diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 7f25a810a4a..e694a94c201 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -43,6 +43,7 @@ #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/ticketholder.h" +#include "mongo/util/read_through_cache.h" namespace mongo { @@ -324,13 +325,128 @@ private: }; /** + * Constructed to be used exclusively by the CatalogCache as a vector clock (Time) to drive + * CollectionCache's lookups. + * + * The ChunkVersion class contains a non comparable epoch, which makes impossible to compare two + * ChunkVersions when their epochs's differ. + * + * This class wraps a ChunkVersion object with a node-local sequence number + * (_epochDisambiguatingSequenceNum) that allows the comparision. + * + * This class should go away once a cluster-wide comparable ChunkVersion is implemented. + */ +class ComparableChunkVersion { +public: + /** + * Creates a ComparableChunkVersion that wraps the given ChunkVersion. + * Each object created through this method will have a local sequence number greater than the + * previously created ones. + */ + static ComparableChunkVersion makeComparableChunkVersion(const ChunkVersion& version); + + /** + * Creates a ComparableChunkVersion object, which will artificially be greater than any that + * were previously created by `makeComparableChunkVersion`. Used as means to cause the + * collections cache to attempt a refresh in situations where causal consistency cannot be + * inferred. + */ + static ComparableChunkVersion makeComparableChunkVersionForForcedRefresh(); + + /** + * Empty constructor needed by the ReadThroughCache. + * + * Instances created through this constructor will be always less then the ones created through + * the two static constructors, but they do not carry any meaningful value and can only be used + * for comparison purposes. + */ + ComparableChunkVersion() = default; + + const ChunkVersion& getVersion() const { + return *_chunkVersion; + } + + std::string toString() const; + + bool sameEpoch(const ComparableChunkVersion& other) const { + return _chunkVersion->epoch() == other._chunkVersion->epoch(); + } + + bool operator==(const ComparableChunkVersion& other) const; + + bool operator!=(const ComparableChunkVersion& other) const { + return !(*this == other); + } + + /** + * In case the two compared instances have different epochs, the most recently created one will + * be greater, otherwise the comparision will be driven by the major/minor versions of the + * underlying ChunkVersion. + */ + bool operator<(const ComparableChunkVersion& other) const; + + bool operator>(const ComparableChunkVersion& other) const { + return other < *this; + } + + bool operator<=(const ComparableChunkVersion& other) const { + return !(*this > other); + } + + bool operator>=(const ComparableChunkVersion& other) const { + return !(*this < other); + } + +private: + static AtomicWord<uint64_t> _epochDisambiguatingSequenceNumSource; + static AtomicWord<uint64_t> _forcedRefreshSequenceNumSource; + + ComparableChunkVersion(uint64_t forcedRefreshSequenceNum, + boost::optional<ChunkVersion> version, + uint64_t epochDisambiguatingSequenceNum) + : _forcedRefreshSequenceNum(forcedRefreshSequenceNum), + _chunkVersion(std::move(version)), + _epochDisambiguatingSequenceNum(epochDisambiguatingSequenceNum) {} + + uint64_t _forcedRefreshSequenceNum{0}; + + boost::optional<ChunkVersion> _chunkVersion; + + // Locally incremented sequence number that allows to compare two colection versions with + // different epochs. Each new comparableChunkVersion will have a greater sequence number than + // the ones created before. + uint64_t _epochDisambiguatingSequenceNum{0}; +}; + +/** + * This intermediate structure is necessary to be able to store UNSHARDED collections in the routing + * table history cache below. The reason is that currently the RoutingTableHistory class only + * supports sharded collections (i.e., collections which have entries in config.collections and + * config.chunks). + */ +struct OptionalRoutingTableHistory { + // UNSHARDED collection constructor + OptionalRoutingTableHistory() = default; + + // SHARDED collection constructor + OptionalRoutingTableHistory(RoutingTableHistory&& rt) : optRt(std::move(rt)) {} + + // If boost::none, the collection is UNSHARDED, otherwise it is SHARDED + boost::optional<RoutingTableHistory> optRt; +}; + +using RoutingTableHistoryCache = + ReadThroughCache<NamespaceString, OptionalRoutingTableHistory, ComparableChunkVersion>; +using RoutingTableHistoryValueHandle = RoutingTableHistoryCache::ValueHandle; + +/** * Wrapper around a RoutingTableHistory, which pins it to a particular point in time. */ class ChunkManager { public: ChunkManager(ShardId dbPrimary, DatabaseVersion dbVersion, - std::shared_ptr<RoutingTableHistory> rt, + RoutingTableHistoryValueHandle rt, boost::optional<Timestamp> clusterTime) : _dbPrimary(std::move(dbPrimary)), _dbVersion(std::move(dbVersion)), @@ -340,7 +456,7 @@ public: // Methods supported on both sharded and unsharded collections bool isSharded() const { - return bool(_rt); + return bool(_rt->optRt); } const ShardId& dbPrimary() const { @@ -352,7 +468,7 @@ public: } int numChunks() const { - return _rt ? _rt->numChunks() : 1; + return _rt->optRt ? _rt->optRt->numChunks() : 1; } std::string toString() const; @@ -360,32 +476,32 @@ public: // Methods only supported on sharded collections (caller must check isSharded()) const ShardKeyPattern& getShardKeyPattern() const { - return _rt->getShardKeyPattern(); + return _rt->optRt->getShardKeyPattern(); } const CollatorInterface* getDefaultCollator() const { - return _rt->getDefaultCollator(); + return _rt->optRt->getDefaultCollator(); } bool isUnique() const { - return _rt->isUnique(); + return _rt->optRt->isUnique(); } ChunkVersion getVersion() const { - return _rt->getVersion(); + return _rt->optRt->getVersion(); } ChunkVersion getVersion(const ShardId& shardId) const { - return _rt->getVersion(shardId); + return _rt->optRt->getVersion(shardId); } ChunkVersion getVersionForLogging(const ShardId& shardId) const { - return _rt->getVersionForLogging(shardId); + return _rt->optRt->getVersionForLogging(shardId); } template <typename Callable> void forEachChunk(Callable&& handler) const { - _rt->forEachChunk( + _rt->optRt->forEachChunk( [this, handler = std::forward<Callable>(handler)](const auto& chunkInfo) mutable { if (!handler(Chunk{*chunkInfo, _clusterTime})) return false; @@ -461,14 +577,14 @@ public: * Returns the ids of all shards on which the collection has any chunks. */ void getAllShardIds(std::set<ShardId>* all) const { - _rt->getAllShardIds(all); + _rt->optRt->getAllShardIds(all); } /** * Returns the number of shards on which the collection has any chunks */ int getNShardsOwningChunks() const { - return _rt->getNShardsOwningChunks(); + return _rt->optRt->getNShardsOwningChunks(); } // Transforms query into bounds for each field in the shard key @@ -500,30 +616,30 @@ public: * Returns true if, for this shard, the chunks are identical in both chunk managers */ bool compatibleWith(const ChunkManager& other, const ShardId& shard) const { - return _rt->compatibleWith(*other._rt, shard); + return _rt->optRt->compatibleWith(*other._rt->optRt, shard); } bool uuidMatches(UUID uuid) const { - return _rt->uuidMatches(uuid); + return _rt->optRt->uuidMatches(uuid); } boost::optional<UUID> getUUID() const { - return _rt->getUUID(); + return _rt->optRt->getUUID(); } const boost::optional<TypeCollectionReshardingFields>& getReshardingFields() const { - return _rt->getReshardingFields(); + return _rt->optRt->getReshardingFields(); } const RoutingTableHistory& getRoutingTableHistory_ForTest() const { - return *_rt; + return *_rt->optRt; } private: ShardId _dbPrimary; DatabaseVersion _dbVersion; - std::shared_ptr<RoutingTableHistory> _rt; + RoutingTableHistoryValueHandle _rt; boost::optional<Timestamp> _clusterTime; }; diff --git a/src/mongo/s/chunk_manager_refresh_bm.cpp b/src/mongo/s/chunk_manager_refresh_bm.cpp index a3feba2de1e..bd9b133301c 100644 --- a/src/mongo/s/chunk_manager_refresh_bm.cpp +++ b/src/mongo/s/chunk_manager_refresh_bm.cpp @@ -43,8 +43,10 @@ namespace { const NamespaceString kNss("test", "foo"); -std::shared_ptr<RoutingTableHistory> makeStandaloneRoutingTableHistory(RoutingTableHistory rt) { - return std::make_shared<RoutingTableHistory>(std::move(rt)); +RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) { + const auto version = rt.getVersion(); + return RoutingTableHistoryValueHandle( + std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version)); } ChunkRange getRangeForChunk(int i, int nChunks) { @@ -69,6 +71,7 @@ CollectionMetadata makeChunkManagerWithShardSelector(int nShards, std::vector<ChunkType> chunks; chunks.reserve(nChunks); + for (uint32_t i = 0; i < nChunks; ++i) { chunks.emplace_back(kNss, getRangeForChunk(i, nChunks), @@ -144,13 +147,13 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS const uint32_t nChunks = state.range(1); const auto collEpoch = OID::gen(); - const auto collName = NamespaceString("test.foo"); const auto shardKeyPattern = KeyPattern(BSON("_id" << 1)); std::vector<ChunkType> chunks; chunks.reserve(nChunks); + for (uint32_t i = 0; i < nChunks; ++i) { - chunks.emplace_back(collName, + chunks.emplace_back(kNss, getRangeForChunk(i, nChunks), ChunkVersion{i + 1, 0, collEpoch}, selectShard(i, nShards, nChunks)); @@ -158,7 +161,7 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS for (auto keepRunning : state) { auto rt = RoutingTableHistory::makeNew( - collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, boost::none, chunks); + kNss, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, boost::none, chunks); benchmark::DoNotOptimize( CollectionMetadata(ChunkManager(ShardId("shard0"), DatabaseVersion(UUID::gen(), 1), diff --git a/src/mongo/s/commands/cluster_drop_cmd.cpp b/src/mongo/s/commands/cluster_drop_cmd.cpp index a69e3292597..f727489ccc0 100644 --- a/src/mongo/s/commands/cluster_drop_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_cmd.cpp @@ -88,7 +88,9 @@ public: // Invalidate the routing table cache entry for this collection so that we reload it the // next time it is accessed, even if sending the command to the config server fails due // to e.g. a NetworkError. - ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); }); + ON_BLOCK_EXIT([opCtx, nss] { + Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss); + }); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp index b4157bee9d9..531aa1ab41e 100644 --- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp +++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp @@ -174,8 +174,10 @@ public: Shard::RetryPolicy::kNotIdempotent)); uassertStatusOK(response.commandStatus); - Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection( - nss, firstChunk.getShardId()); + Grid::get(opCtx) + ->catalogCache() + ->invalidateShardOrEntireCollectionEntryForShardedCollection( + nss, boost::none, firstChunk.getShardId()); CommandHelpers::filterCommandReplyForPassthrough(response.response, &result); return true; diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index 01cdb91234e..f6e2d27c80f 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -198,9 +198,14 @@ public: cmdObj["waitForDelete"].trueValue(), forceJumbo)); - Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, - chunk->getShardId()); - Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, to->getId()); + Grid::get(opCtx) + ->catalogCache() + ->invalidateShardOrEntireCollectionEntryForShardedCollection( + nss, boost::none, chunk->getShardId()); + Grid::get(opCtx) + ->catalogCache() + ->invalidateShardOrEntireCollectionEntryForShardedCollection( + nss, boost::none, to->getId()); result.append("millis", t.millis()); return true; diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index d27fd037d30..d4c4d7901ad 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -105,7 +105,9 @@ public: // Invalidate the routing table cache entry for this collection so that we reload the // collection the next time it's accessed, even if we receive a failure, e.g. NetworkError. - ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); }); + ON_BLOCK_EXIT([opCtx, nss] { + Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss); + }); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp index 19d33b3f10b..5532fac1daf 100644 --- a/src/mongo/s/commands/cluster_split_cmd.cpp +++ b/src/mongo/s/commands/cluster_split_cmd.cpp @@ -270,8 +270,10 @@ public: ChunkRange(chunk->getMin(), chunk->getMax()), {splitPoint})); - Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, - chunk->getShardId()); + Grid::get(opCtx) + ->catalogCache() + ->invalidateShardOrEntireCollectionEntryForShardedCollection( + nss, boost::none, chunk->getShardId()); return true; } diff --git a/src/mongo/s/commands/flush_router_config_cmd.cpp b/src/mongo/s/commands/flush_router_config_cmd.cpp index bcc61a82a0a..d27b65a2c4d 100644 --- a/src/mongo/s/commands/flush_router_config_cmd.cpp +++ b/src/mongo/s/commands/flush_router_config_cmd.cpp @@ -102,7 +102,7 @@ public: "Routing metadata flushed for collection {namespace}", "Routing metadata flushed for collection", "namespace"_attr = nss); - catalogCache->purgeCollection(nss); + catalogCache->invalidateCollectionEntry_LINEARIZABLE(nss); } } diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 644c10e6bcb..f83b490d0ef 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -722,16 +722,12 @@ void runCommand(OperationContext* opCtx, auto catalogCache = Grid::get(opCtx)->catalogCache(); if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( - opCtx, - staleNs, - staleInfo->getVersionWanted(), - staleInfo->getVersionReceived(), - staleInfo->getShardId()); + staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId()); } else { // If we don't have the stale config info and therefore don't know the shard's // id, we have to force all further targetting requests for the namespace to // block on a refresh. - catalogCache->onEpochChange(staleNs); + catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs); } @@ -1301,16 +1297,12 @@ void Strategy::explainFind(OperationContext* opCtx, Grid::get(opCtx) ->catalogCache() ->invalidateShardOrEntireCollectionEntryForShardedCollection( - opCtx, - staleNs, - staleInfo->getVersionWanted(), - staleInfo->getVersionReceived(), - staleInfo->getShardId()); + staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId()); } else { // If we don't have the stale config info and therefore don't know the shard's id, // we have to force all further targetting requests for the namespace to block on // a refresh. - Grid::get(opCtx)->catalogCache()->onEpochChange(staleNs); + Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(staleNs); } if (canRetry) { diff --git a/src/mongo/s/comparable_chunk_version_test.cpp b/src/mongo/s/comparable_chunk_version_test.cpp index 941d9bad080..8c1fa71fce2 100644 --- a/src/mongo/s/comparable_chunk_version_test.cpp +++ b/src/mongo/s/comparable_chunk_version_test.cpp @@ -29,8 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_version.h" +#include "mongo/s/chunk_manager.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -95,9 +94,15 @@ TEST(ComparableChunkVersionTest, VersionLessSameEpoch) { ASSERT_FALSE(version2 > version3); } +TEST(ComparableChunkVersionTest, DefaultConstructedVersionsAreEqual) { + const ComparableChunkVersion defaultVersion1{}, defaultVersion2{}; + ASSERT(defaultVersion1 == defaultVersion2); + ASSERT_FALSE(defaultVersion1 < defaultVersion2); + ASSERT_FALSE(defaultVersion1 > defaultVersion2); +} + TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLess) { const ComparableChunkVersion defaultVersion{}; - ASSERT_EQ(defaultVersion.getLocalSequenceNum(), 0); const auto version1 = ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen())); ASSERT(defaultVersion != version1); @@ -105,5 +110,127 @@ TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLess) { ASSERT_FALSE(defaultVersion > version1); } +TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLessThanUnsharded) { + const ComparableChunkVersion defaultVersion{}; + const auto version1 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()); + ASSERT(defaultVersion != version1); + ASSERT(defaultVersion < version1); + ASSERT_FALSE(defaultVersion > version1); +} + +TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLessThanDropped) { + const ComparableChunkVersion defaultVersion{}; + const auto version1 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED()); + ASSERT(defaultVersion != version1); + ASSERT(defaultVersion < version1); + ASSERT_FALSE(defaultVersion > version1); +} + +TEST(ComparableChunkVersionTest, UnshardedAndDroppedAreEqual) { + const auto version1 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()); + const auto version2 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED()); + const auto version3 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()); + const auto version4 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED()); + ASSERT(version1 == version2); + ASSERT(version1 == version3); + ASSERT(version2 == version4); +} + +TEST(ComparableChunkVersionTest, NoChunksAreDifferent) { + const auto oid = OID::gen(); + const auto version1 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid)); + const auto version2 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid)); + ASSERT(version1 != version2); + ASSERT(version1 < version2); + ASSERT_FALSE(version1 > version2); +} + +TEST(ComparableChunkVersionTest, NoChunksCompareBySequenceNum) { + const auto oid = OID::gen(); + const auto version1 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(1, 0, oid)); + const auto noChunkSV1 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid)); + + ASSERT(version1 != noChunkSV1); + ASSERT(noChunkSV1 > version1); + + const auto noChunkSV2 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid)); + + ASSERT(noChunkSV1 != noChunkSV2); + ASSERT_FALSE(noChunkSV1 > noChunkSV2); + ASSERT(noChunkSV2 > noChunkSV1); + + const auto version2 = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(2, 0, oid)); + + ASSERT(version2 != noChunkSV2); + ASSERT(version2 > noChunkSV2); +} + +TEST(ComparableChunkVersionTest, NoChunksGreaterThanUnshardedBySequenceNum) { + const auto unsharded = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()); + const auto noChunkSV = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen())); + + ASSERT(noChunkSV != unsharded); + ASSERT(noChunkSV > unsharded); +} + +TEST(ComparableChunkVersionTest, UnshardedGreaterThanNoChunksBySequenceNum) { + const auto noChunkSV = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen())); + const auto unsharded = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()); + + ASSERT(noChunkSV != unsharded); + ASSERT(unsharded > noChunkSV); +} + +TEST(ComparableChunkVersionTest, NoChunksGreaterThanDefault) { + const auto noChunkSV = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen())); + const ComparableChunkVersion defaultVersion{}; + + ASSERT(noChunkSV != defaultVersion); + ASSERT(noChunkSV > defaultVersion); +} + +TEST(ComparableChunkVersionTest, ForcedRefreshSequenceNumber) { + auto oid = OID::gen(); + const ComparableChunkVersion defaultVersionBeforeForce; + const auto versionBeforeForce = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(100, 0, oid)); + + const auto forcedRefreshVersion = + ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh(); + + const auto versionAfterForce = + ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(100, 0, oid)); + const ComparableChunkVersion defaultVersionAfterForce; + + ASSERT(defaultVersionBeforeForce != forcedRefreshVersion); + ASSERT(defaultVersionBeforeForce < forcedRefreshVersion); + + ASSERT(versionBeforeForce != forcedRefreshVersion); + ASSERT(versionBeforeForce < forcedRefreshVersion); + + ASSERT(versionAfterForce != forcedRefreshVersion); + ASSERT(versionAfterForce > forcedRefreshVersion); + + ASSERT(defaultVersionAfterForce != forcedRefreshVersion); + ASSERT(defaultVersionAfterForce < forcedRefreshVersion); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/comparable_database_version_test.cpp b/src/mongo/s/comparable_database_version_test.cpp index 3b2486a5ebd..d4201d56564 100644 --- a/src/mongo/s/comparable_database_version_test.cpp +++ b/src/mongo/s/comparable_database_version_test.cpp @@ -82,9 +82,15 @@ TEST(ComparableDatabaseVersionTest, VersionLessSameUuid) { ASSERT_FALSE(version1 > version2); } +TEST(ComparableDatabaseVersionTest, DefaultConstructedVersionsAreEqual) { + const ComparableDatabaseVersion defaultVersion1{}, defaultVersion2{}; + ASSERT(defaultVersion1 == defaultVersion2); + ASSERT_FALSE(defaultVersion1 < defaultVersion2); + ASSERT_FALSE(defaultVersion1 > defaultVersion2); +} + TEST(ComparableDatabaseVersionTest, DefaultConstructedVersionIsAlwaysLess) { const ComparableDatabaseVersion defaultVersion{}; - ASSERT_EQ(defaultVersion.getLocalSequenceNum(), 0); const auto version1 = ComparableDatabaseVersion::makeComparableDatabaseVersion(DatabaseVersion(UUID::gen(), 0)); ASSERT(defaultVersion != version1); diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 3996e01c326..12073508642 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -504,18 +504,18 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, // Re-target and re-send the initial find command to the shards until we have established the // shard version. for (size_t retries = 1; retries <= kMaxRetries; ++retries) { - auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss()); - if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { + auto swCM = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss()); + if (swCM == ErrorCodes::NamespaceNotFound) { // If the database doesn't exist, we successfully return an empty result set without // creating a cursor. return CursorId(0); } - auto routingInfo = uassertStatusOK(routingInfoStatus); + const auto cm = uassertStatusOK(std::move(swCM)); try { return runQueryWithoutRetrying( - opCtx, query, readPref, routingInfo, results, partialResultsReturned); + opCtx, query, readPref, cm, results, partialResultsReturned); } catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) { if (retries >= kMaxRetries) { // Check if there are no retries remaining, so the last received error can be @@ -577,13 +577,9 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, if (ex.code() != ErrorCodes::ShardInvalidatedForTargeting) { if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) { catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( - opCtx, - query.nss(), - staleInfo->getVersionWanted(), - staleInfo->getVersionReceived(), - staleInfo->getShardId()); + query.nss(), staleInfo->getVersionWanted(), staleInfo->getShardId()); } else { - catalogCache->onEpochChange(query.nss()); + catalogCache->invalidateCollectionEntry_LINEARIZABLE(query.nss()); } } diff --git a/src/mongo/s/request_types/set_shard_version_request.h b/src/mongo/s/request_types/set_shard_version_request.h index bfd7385ffae..44cacff0415 100644 --- a/src/mongo/s/request_types/set_shard_version_request.h +++ b/src/mongo/s/request_types/set_shard_version_request.h @@ -98,6 +98,7 @@ private: SetShardVersionRequest(); bool _isAuthoritative{false}; + // TODO (SERVER-50812) remove this flag that isn't used anymore bool _forceRefresh{false}; boost::optional<NamespaceString> _nss; diff --git a/src/mongo/s/sessions_collection_sharded.cpp b/src/mongo/s/sessions_collection_sharded.cpp index 060c1158dbd..22915bd2c0a 100644 --- a/src/mongo/s/sessions_collection_sharded.cpp +++ b/src/mongo/s/sessions_collection_sharded.cpp @@ -123,8 +123,6 @@ void SessionsCollectionSharded::checkSessionsCollectionExists(OperationContext* const auto cm = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh( opCtx, NamespaceString::kLogicalSessionsNamespace)); - - uassert(ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", cm.isSharded()); } void SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, diff --git a/src/mongo/s/sharding_test_fixture_common.cpp b/src/mongo/s/sharding_test_fixture_common.cpp index 95dd505687b..2ac936d3977 100644 --- a/src/mongo/s/sharding_test_fixture_common.cpp +++ b/src/mongo/s/sharding_test_fixture_common.cpp @@ -47,9 +47,11 @@ ShardingTestFixtureCommon::ShardingTestFixtureCommon() { ShardingTestFixtureCommon::~ShardingTestFixtureCommon() = default; -std::shared_ptr<RoutingTableHistory> ShardingTestFixtureCommon::makeStandaloneRoutingTableHistory( +RoutingTableHistoryValueHandle ShardingTestFixtureCommon::makeStandaloneRoutingTableHistory( RoutingTableHistory rt) { - return std::make_shared<RoutingTableHistory>(std::move(rt)); + const auto version = rt.getVersion(); + return RoutingTableHistoryValueHandle( + std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version)); } void ShardingTestFixtureCommon::onCommand(NetworkTestEnv::OnCommandFunction func) { diff --git a/src/mongo/s/sharding_test_fixture_common.h b/src/mongo/s/sharding_test_fixture_common.h index 0ecbbb30695..52377d7fbc5 100644 --- a/src/mongo/s/sharding_test_fixture_common.h +++ b/src/mongo/s/sharding_test_fixture_common.h @@ -55,8 +55,7 @@ public: * which can be used to pass to ChunkManager for tests, which specifically target the behaviour * of the ChunkManager. */ - static std::shared_ptr<RoutingTableHistory> makeStandaloneRoutingTableHistory( - RoutingTableHistory rt); + static RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt); protected: ShardingTestFixtureCommon(); diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index f7189efdfe9..6794dabc3ca 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -791,7 +791,7 @@ int ChunkManagerTargeter::getNShardsOwningChunks() const { void ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) { uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true)); + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss)); _init(opCtx); } |