summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/catalog_cache.cpp762
-rw-r--r--src/mongo/s/catalog_cache.h352
-rw-r--r--src/mongo/s/catalog_cache_refresh_test.cpp16
-rw-r--r--src/mongo/s/catalog_cache_test.cpp129
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.cpp20
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.h11
-rw-r--r--src/mongo/s/chunk_manager.cpp108
-rw-r--r--src/mongo/s/chunk_manager.h152
-rw-r--r--src/mongo/s/chunk_manager_refresh_bm.cpp13
-rw-r--r--src/mongo/s/commands/cluster_drop_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_merge_chunks_cmd.cpp6
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp11
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_split_cmd.cpp6
-rw-r--r--src/mongo/s/commands/flush_router_config_cmd.cpp2
-rw-r--r--src/mongo/s/commands/strategy.cpp16
-rw-r--r--src/mongo/s/comparable_chunk_version_test.cpp133
-rw-r--r--src/mongo/s/comparable_database_version_test.cpp8
-rw-r--r--src/mongo/s/query/cluster_find.cpp16
-rw-r--r--src/mongo/s/request_types/set_shard_version_request.h1
-rw-r--r--src/mongo/s/sessions_collection_sharded.cpp2
-rw-r--r--src/mongo/s/sharding_test_fixture_common.cpp6
-rw-r--r--src/mongo/s/sharding_test_fixture_common.h3
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp2
24 files changed, 927 insertions, 856 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 19846e62b48..d9c2500f2d3 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -55,6 +55,7 @@
#include "mongo/util/timer.h"
namespace mongo {
+
const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh =
OperationContext::declareDecoration<bool>();
@@ -68,81 +69,8 @@ namespace {
const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
const int kDatabaseCacheSize = 10000;
-/**
- * Returns whether two shard versions have a matching epoch.
- */
-bool shardVersionsHaveMatchingEpoch(boost::optional<ChunkVersion> wanted,
- const ChunkVersion& received) {
- return wanted && wanted->epoch() == received.epoch();
-};
-
-/**
- * Given an (optional) initial routing table and a set of changed chunks returned by the catalog
- * cache loader, produces a new routing table with the changes applied.
- *
- * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that
- * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace
- * so that the routing table can be built from scratch.
- *
- * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not
- * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same
- * as that of the collection). Since this situation may be transient, due to the collection being
- * dropped or having its shard key refined concurrently, the caller must retry the reload up to some
- * configurable number of attempts.
- */
-std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
- OperationContext* opCtx,
- const NamespaceString& nss,
- std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) {
- if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- return nullptr;
- }
- const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks));
-
- auto chunkManager = [&] {
- // If we have routing info already and it's for the same collection epoch, we're updating.
- // Otherwise, we're making a whole new routing table.
- if (existingRoutingInfo &&
- existingRoutingInfo->getVersion().epoch() == collectionAndChunks.epoch) {
- if (collectionAndChunks.changedChunks.size() == 1 &&
- collectionAndChunks.changedChunks[0].getVersion() ==
- existingRoutingInfo->getVersion())
- return existingRoutingInfo;
-
- return std::make_shared<RoutingTableHistory>(
- existingRoutingInfo->makeUpdated(std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }
-
- auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
- if (!collectionAndChunks.defaultCollation.isEmpty()) {
- // The collation should have been validated upon collection creation
- return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collectionAndChunks.defaultCollation));
- }
- return nullptr;
- }();
-
- return std::make_shared<RoutingTableHistory>(
- RoutingTableHistory::makeNew(nss,
- collectionAndChunks.uuid,
- KeyPattern(collectionAndChunks.shardKeyPattern),
- std::move(defaultCollator),
- collectionAndChunks.shardKeyIsUnique,
- collectionAndChunks.epoch,
- std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }();
-
- std::set<ShardId> shardIds;
- chunkManager->getAllShardIds(&shardIds);
- for (const auto& shardId : shardIds) {
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- }
- return chunkManager;
-}
+const int kCollectionCacheSize = 10000;
} // namespace
@@ -155,7 +83,8 @@ CatalogCache::CatalogCache(ServiceContext* const service, CatalogCacheLoader& ca
options.maxThreads = 6;
return options;
}())),
- _databaseCache(service, *_executor, _cacheLoader) {
+ _databaseCache(service, *_executor, _cacheLoader),
+ _collectionCache(service, *_executor, _cacheLoader) {
_executor->startup();
}
@@ -190,111 +119,89 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
}
}
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoWithForcedRefresh(
- OperationContext* opCtx, const NamespaceString& nss) {
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- _createOrGetCollectionEntryAndMarkAsNeedsRefresh(nss);
- return _getCollectionRoutingInfo(opCtx, nss);
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
-}
-
-
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
- const NamespaceString& nss,
- Timestamp atClusterTime) {
- return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt(
+StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) {
- invariant(!opCtx->lockState() || !opCtx->lockState()->isLocked(),
- "Do not hold a lock while refreshing the catalog cache. Doing so would potentially "
- "hold the lock during a network call, and can lead to a deadlock as described in "
- "SERVER-37398.");
- // This default value can cause a single unnecessary extra refresh if this thread did do the
- // refresh but the refresh failed, or if the database or collection was not found, but only if
- // the caller is getCollectionRoutingInfoWithRefresh with the parameter
- // forceRefreshFromThisThread set to true
- RefreshAction refreshActionTaken(RefreshAction::kDidNotPerformRefresh);
- while (true) {
+ invariant(
+ !opCtx->lockState() || !opCtx->lockState()->isLocked(),
+ "Do not hold a lock while refreshing the catalog cache. Doing so would potentially hold "
+ "the lock during a network call, and can lead to a deadlock as described in SERVER-37398.");
+
+ try {
const auto swDbInfo = getDatabase(opCtx, nss.db());
+
if (!swDbInfo.isOK()) {
if (swDbInfo == ErrorCodes::NamespaceNotFound) {
LOGV2_FOR_CATALOG_REFRESH(
- 4947102,
+ 4947103,
2,
"Invalidating cached collection entry because its database has been dropped",
"namespace"_attr = nss);
- purgeCollection(nss);
+ invalidateCollectionEntry_LINEARIZABLE(nss);
}
- return {swDbInfo.getStatus(), refreshActionTaken};
+ return swDbInfo.getStatus();
}
const auto dbInfo = std::move(swDbInfo.getValue());
- stdx::unique_lock<Latch> ul(_mutex);
-
- auto collEntry = _createOrGetCollectionEntry(ul, nss);
+ const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh &&
+ !operationShouldBlockBehindCatalogCacheRefresh(opCtx)
+ ? CacheCausalConsistency::kLatestCached
+ : CacheCausalConsistency::kLatestKnown;
- if (collEntry->needsRefresh &&
- (!gEnableFinerGrainedCatalogCacheRefresh || collEntry->epochHasChanged ||
- operationShouldBlockBehindCatalogCacheRefresh(opCtx))) {
+ auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
- operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
+ // If the entry is in the cache return inmediately.
+ if (collEntryFuture.isReady()) {
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ collEntryFuture.get(opCtx),
+ atClusterTime);
+ }
- auto refreshNotification = collEntry->refreshCompletionNotification;
- if (!refreshNotification) {
- refreshNotification = (collEntry->refreshCompletionNotification =
- std::make_shared<Notification<Status>>());
- _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1);
- refreshActionTaken = RefreshAction::kPerformedRefresh;
- }
+ operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
- // Wait on the notification outside of the mutex
- ul.unlock();
+ size_t acquireTries = 0;
+ Timer t;
- auto refreshStatus = [&]() {
- Timer t;
- ON_BLOCK_EXIT([&] { _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); });
+ while (true) {
+ try {
+ auto collEntry = collEntryFuture.get(opCtx);
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- try {
- const Milliseconds kReportingInterval{250};
- while (!refreshNotification->waitFor(opCtx, kReportingInterval)) {
- _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- t.reset();
- }
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
- return refreshNotification->get(opCtx);
- } catch (const DBException& ex) {
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ std::move(collEntry),
+ atClusterTime);
+ } catch (ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) {
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
+ acquireTries++;
+ if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) {
return ex.toStatus();
}
- }();
-
- if (!refreshStatus.isOK()) {
- return {refreshStatus, refreshActionTaken};
}
- // Once the refresh is complete, loop around to get the latest value
- continue;
+ collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
+ t.reset();
}
-
- return {ChunkManager(dbInfo.primaryId(),
- dbInfo.databaseVersion(),
- collEntry->routingInfo,
- atClusterTime),
- refreshActionTaken};
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
}
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
+}
+
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Timestamp atClusterTime) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime);
+}
+
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx,
StringData dbName) {
// TODO SERVER-49724: Make ReadThroughCache support StringData keys
@@ -303,32 +210,20 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon
}
StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh(
- OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) {
- auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- // We want to ensure that we don't join an in-progress refresh because that
- // could violate causal consistency for this client. We don't need to actually perform the
- // refresh ourselves but we do need the refresh to begin *after* this function is
- // called, so calling it twice is enough regardless of what happens the
- // second time. See SERVER-33954 for reasoning.
- if (forceRefreshFromThisThread &&
- refreshResult.actionTaken == RefreshAction::kDidNotPerformRefresh) {
- refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- }
- return refreshResult.statusWithInfo;
+ OperationContext* opCtx, const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
+ return getCollectionRoutingInfo(opCtx, nss);
}
StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss) {
- auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo;
- if (!swRoutingInfo.isOK())
- return swRoutingInfo;
-
- auto cri(std::move(swRoutingInfo.getValue()));
- if (!cri.isSharded())
+ auto routingInfoStatus = getCollectionRoutingInfoWithRefresh(opCtx, nss);
+ if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().isSharded()) {
return {ErrorCodes::NamespaceNotSharded,
str::stream() << "Collection " << nss.ns() << " is not sharded."};
-
- return cri;
+ }
+ return routingInfoStatus;
}
void CatalogCache::onStaleDatabaseVersion(const StringData dbName,
@@ -350,48 +245,49 @@ void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationCon
if (gEnableFinerGrainedCatalogCacheRefresh) {
operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock;
}
-};
+}
void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection(
- OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> wantedVersion,
- const ChunkVersion& receivedVersion,
- ShardId shardId) {
- if (shardVersionsHaveMatchingEpoch(wantedVersion, receivedVersion)) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, shardId);
- } else {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
+ const boost::optional<ChunkVersion>& wantedVersion,
+ const ShardId& shardId) {
+ _stats.countStaleConfigErrors.addAndFetch(1);
+
+ auto collectionEntry = _collectionCache.peekLatestCached(nss);
+ if (collectionEntry && collectionEntry->optRt) {
+ collectionEntry->optRt->setShardStale(shardId);
}
-};
-void CatalogCache::onEpochChange(const NamespaceString& nss) {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
-};
+ if (wantedVersion) {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersion(*wantedVersion));
+ } else {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh());
+ }
+}
void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
- ChunkVersion targetCollectionVersion,
- const ShardId& shardId) const {
- stdx::lock_guard<Latch> lg(_mutex);
- const auto itDb = _collectionsByDb.find(nss.db());
+ const ChunkVersion& targetCollectionVersion,
+ const ShardId& shardId) {
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for database " << nss.db(),
- itDb != _collectionsByDb.end());
+ _databaseCache.peekLatestCached(nss.db().toString()));
- auto itColl = itDb->second.find(nss.ns());
+ auto collectionValueHandle = _collectionCache.peekLatestCached(nss);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for collection.",
- itColl != itDb->second.end());
+ collectionValueHandle);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString()
<< ", but found the collection was unsharded",
- itColl->second->routingInfo);
+ collectionValueHandle->optRt);
- auto foundVersion = itColl->second->routingInfo->getVersion();
+ auto foundVersion = collectionValueHandle->optRt->getVersion();
uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString() << ", but found "
@@ -399,11 +295,6 @@ void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
foundVersion.epoch() == targetCollectionVersion.epoch());
}
-void CatalogCache::invalidateShardForShardedCollection(const NamespaceString& nss,
- const ShardId& staleShardId) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, staleShardId);
-}
-
void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
LOGV2_DEBUG(4997600,
1,
@@ -413,32 +304,24 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
_databaseCache.invalidateCachedValueIf(
[&](const DatabaseType& dbt) { return dbt.getPrimary() == shardId; });
- stdx::lock_guard<Latch> lg(_mutex);
-
// Invalidate collections which contain data on this shard.
- for (const auto& [db, collInfoMap] : _collectionsByDb) {
- for (const auto& [collNs, collRoutingInfoEntry] : collInfoMap) {
- if (!collRoutingInfoEntry->needsRefresh && collRoutingInfoEntry->routingInfo) {
- // The set of shards on which this collection contains chunks.
- std::set<ShardId> shardsOwningDataForCollection;
- collRoutingInfoEntry->routingInfo->getAllShardIds(&shardsOwningDataForCollection);
-
- if (shardsOwningDataForCollection.find(shardId) !=
- shardsOwningDataForCollection.end()) {
- LOGV2_DEBUG(22647,
- 3,
- "Invalidating cached collection {namespace} that has data "
- "on shard {shardId}",
- "Invalidating cached collection",
- "namespace"_attr = collNs,
- "shardId"_attr = shardId);
-
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->routingInfo->setShardStale(shardId);
- }
- }
- }
- }
+ _collectionCache.invalidateCachedValueIf([&](const OptionalRoutingTableHistory& ort) {
+ if (!ort.optRt)
+ return false;
+ const auto& rt = *ort.optRt;
+
+ std::set<ShardId> shardIds;
+ rt.getAllShardIds(&shardIds);
+
+ LOGV2_DEBUG(22647,
+ 3,
+ "Invalidating cached collection {namespace} that has data "
+ "on shard {shardId}",
+ "Invalidating cached collection",
+ "namespace"_attr = rt.nss(),
+ "shardId"_attr = shardId);
+ return shardIds.find(shardId) != shardIds.end();
+ });
LOGV2(22648,
"Finished invalidating databases and collections with data on shard: {shardId}",
@@ -446,46 +329,28 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
"shardId"_attr = shardId);
}
-void CatalogCache::purgeCollection(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- auto itDb = _collectionsByDb.find(nss.db());
- if (itDb == _collectionsByDb.end()) {
- return;
- }
-
- itDb->second.erase(nss.ns());
-}
-
void CatalogCache::purgeDatabase(StringData dbName) {
_databaseCache.invalidate(dbName.toString());
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.erase(dbName);
+ _collectionCache.invalidateKeyIf(
+ [&](const NamespaceString& nss) { return nss.db() == dbName; });
}
void CatalogCache::purgeAllDatabases() {
_databaseCache.invalidateAll();
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.clear();
+ _collectionCache.invalidateAll();
}
void CatalogCache::report(BSONObjBuilder* builder) const {
BSONObjBuilder cacheStatsBuilder(builder->subobjStart("catalogCache"));
- size_t numDatabaseEntries;
- size_t numCollectionEntries{0};
- {
- numDatabaseEntries = _databaseCache.getCacheInfo().size();
- stdx::lock_guard<Latch> ul(_mutex);
- for (const auto& entry : _collectionsByDb) {
- numCollectionEntries += entry.second.size();
- }
- }
+ const size_t numDatabaseEntries = _databaseCache.getCacheInfo().size();
+ const size_t numCollectionEntries = _collectionCache.getCacheInfo().size();
cacheStatsBuilder.append("numDatabaseEntries", static_cast<long long>(numDatabaseEntries));
cacheStatsBuilder.append("numCollectionEntries", static_cast<long long>(numCollectionEntries));
_stats.report(&cacheStatsBuilder);
+ _collectionCache.reportStats(&cacheStatsBuilder);
}
void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx,
@@ -519,188 +384,8 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC
}
}
-void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
- ServiceContext* service,
- std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
- NamespaceString const& nss,
- int refreshAttempt) {
- const auto existingRoutingInfo = collEntry->routingInfo;
-
- // If we have an existing chunk manager, the refresh is considered "incremental", regardless of
- // how many chunks are in the differential
- const bool isIncremental(existingRoutingInfo);
-
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.addAndFetch(1);
- _stats.countIncrementalRefreshesStarted.addAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.addAndFetch(1);
- _stats.countFullRefreshesStarted.addAndFetch(1);
- }
-
- // Invoked when one iteration of getChunksSince has completed, whether with success or error
- const auto onRefreshCompleted = [this, t = Timer(), nss, isIncremental, existingRoutingInfo](
- const Status& status,
- RoutingTableHistory* routingInfoAfterRefresh) {
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.subtractAndFetch(1);
- }
-
- if (!status.isOK()) {
- _stats.countFailedRefreshes.addAndFetch(1);
-
- LOGV2_OPTIONS(24103,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Error refreshing cached collection {namespace}; Took {duration} and "
- "failed due to {error}",
- "Error refreshing cached collection",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()),
- "error"_attr = redact(status));
- } else if (routingInfoAfterRefresh) {
- const int logLevel =
- (!existingRoutingInfo ||
- (existingRoutingInfo &&
- routingInfoAfterRefresh->getVersion() != existingRoutingInfo->getVersion()))
- ? 0
- : 1;
- LOGV2_FOR_CATALOG_REFRESH(
- 24104,
- logLevel,
- "Refreshed cached collection {namespace} to version {newVersion} from version "
- "{oldVersion}. Took {duration}",
- "Refreshed cached collection",
- "namespace"_attr = nss,
- "newVersion"_attr = routingInfoAfterRefresh->getVersion(),
- "oldVersion"_attr =
- (existingRoutingInfo
- ? (" from version " + existingRoutingInfo->getVersion().toString())
- : ""),
- "duration"_attr = Milliseconds(t.millis()));
- } else {
- LOGV2_OPTIONS(24105,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Collection {namespace} was found to be unsharded after refresh that "
- "took {duration}",
- "Collection has found to be unsharded after refresh",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()));
- }
- };
-
- // Invoked if getChunksSince resulted in error or threw an exception
- const auto onRefreshFailed =
- [ this, service, collEntry, nss, refreshAttempt,
- onRefreshCompleted ](WithLock lk, const Status& status) noexcept {
- onRefreshCompleted(status, nullptr);
-
- // It is possible that the metadata is being changed concurrently, so retry the
- // refresh again
- if (status == ErrorCodes::ConflictingOperationInProgress &&
- refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
- _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1);
- } else {
- // Leave needsRefresh to true so that any subsequent get attempts will kick off
- // another round of refresh
- collEntry->refreshCompletionNotification->set(status);
- collEntry->refreshCompletionNotification = nullptr;
- }
- };
-
- const auto refreshCallback =
- [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
-
- ThreadClient tc("CatalogCache::collectionRefresh", service);
- auto opCtx = tc->makeOperationContext();
-
- std::shared_ptr<RoutingTableHistory> newRoutingInfo;
- try {
- newRoutingInfo = refreshCollectionRoutingInfo(
- opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
-
- onRefreshCompleted(Status::OK(), newRoutingInfo.get());
- } catch (const DBException& ex) {
- stdx::lock_guard<Latch> lg(_mutex);
- onRefreshFailed(lg, ex.toStatus());
- return;
- }
-
- stdx::lock_guard<Latch> lg(_mutex);
-
- collEntry->epochHasChanged = false;
- collEntry->needsRefresh = false;
- collEntry->refreshCompletionNotification->set(Status::OK());
- collEntry->refreshCompletionNotification = nullptr;
-
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false);
-
- // TODO(SERVER-49876): remove clang-tidy NOLINT comments.
- if (existingRoutingInfo && newRoutingInfo && // NOLINT(bugprone-use-after-move)
- existingRoutingInfo->getVersion() == // NOLINT(bugprone-use-after-move)
- newRoutingInfo->getVersion()) { // NOLINT(bugprone-use-after-move)
- // If the routingInfo hasn't changed, we need to manually reset stale shards.
- newRoutingInfo->setAllShardsRefreshed();
- }
-
- collEntry->routingInfo = std::move(newRoutingInfo);
- };
-
- const ChunkVersion startingCollectionVersion =
- (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED());
-
- LOGV2_FOR_CATALOG_REFRESH(
- 24106,
- 1,
- "Refreshing cached collection {namespace} with version {currentCollectionVersion}",
- "namespace"_attr = nss,
- "currentCollectionVersion"_attr = startingCollectionVersion);
-
- _cacheLoader.getChunksSince(nss, startingCollectionVersion)
- .thenRunOn(_executor)
- .getAsync(refreshCallback);
-
- // The routing info for this collection shouldn't change, as other threads may try to use the
- // CatalogCache while we are waiting for the refresh to complete.
- invariant(collEntry->routingInfo.get() == existingRoutingInfo.get());
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->epochHasChanged = true;
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss,
- const ShardId& staleShardId) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- if (collRoutingInfoEntry->routingInfo) {
- collRoutingInfoEntry->routingInfo->setShardStale(staleShardId);
- }
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
-}
-
-std::shared_ptr<CatalogCache::CollectionRoutingInfoEntry> CatalogCache::_createOrGetCollectionEntry(
- WithLock wl, const NamespaceString& nss) {
- auto& collectionsForDb = _collectionsByDb[nss.db()];
- if (!collectionsForDb.contains(nss.ns())) {
- // TODO SERVER-46199: ensure collections cache size is capped
- // currently no routine except for dropDatabase is removing cached collection entries and
- // the cache for a specific DB can grow indefinitely.
- collectionsForDb[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>();
- }
-
- return collectionsForDb[nss.ns()];
+void CatalogCache::invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
}
void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
@@ -708,14 +393,6 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
builder->append("totalRefreshWaitTimeMicros", totalRefreshWaitTimeMicros.load());
- builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
- builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-
- builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
- builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-
- builder->append("countFailedRefreshes", countFailedRefreshes.load());
-
if (isMongos()) {
BSONObjBuilder operationsBlockedByRefreshBuilder(
builder->subobjStart("operationsBlockedByRefresh"));
@@ -756,7 +433,6 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
OperationContext* opCtx,
const std::string& dbName,
const ComparableDatabaseVersion& previousDbVersion) {
-
// TODO (SERVER-34164): Track and increment stats for database refreshes
LOGV2_FOR_CATALOG_REFRESH(24102, 2, "Refreshing cached database entry", "db"_attr = dbName);
@@ -788,73 +464,199 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
}
}
-AtomicWord<uint64_t> ComparableDatabaseVersion::_localSequenceNumSource{1ULL};
+CatalogCache::CollectionCache::CollectionCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ CatalogCacheLoader& catalogCacheLoader)
+ : ReadThroughCache(_mutex,
+ service,
+ threadPool,
+ [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ValueHandle& collectionHistory,
+ const ComparableChunkVersion& previousChunkVersion) {
+ return _lookupCollection(
+ opCtx, nss, collectionHistory, previousChunkVersion);
+ },
+ kCollectionCacheSize),
+ _catalogCacheLoader(catalogCacheLoader) {}
-ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
- const DatabaseVersion& version) {
- return ComparableDatabaseVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+void CatalogCache::CollectionCache::reportStats(BSONObjBuilder* builder) const {
+ _stats.report(builder);
}
-const DatabaseVersion& ComparableDatabaseVersion::getVersion() const {
- return _dbVersion;
+void CatalogCache::CollectionCache::_updateRefreshesStats(const bool isIncremental,
+ const bool add) {
+ if (add) {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.addAndFetch(1);
+ _stats.countIncrementalRefreshesStarted.addAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.addAndFetch(1);
+ _stats.countFullRefreshesStarted.addAndFetch(1);
+ }
+ } else {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.subtractAndFetch(1);
+ }
+ }
}
-uint64_t ComparableDatabaseVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
-}
+void CatalogCache::CollectionCache::Stats::report(BSONObjBuilder* builder) const {
+ builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
+ builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-BSONObj ComparableDatabaseVersion::toBSON() const {
- BSONObjBuilder builder;
- _dbVersion.getUuid().appendToBuilder(&builder, "uuid");
- builder.append("lastMod", _dbVersion.getLastMod());
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
-}
+ builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
+ builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-std::string ComparableDatabaseVersion::toString() const {
- return toBSON().toString();
+ builder->append("countFailedRefreshes", countFailedRefreshes.load());
}
+CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_lookupCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const RoutingTableHistoryValueHandle& existingHistory,
+ const ComparableChunkVersion& previousVersion) {
+ const bool isIncremental(existingHistory && existingHistory->optRt);
+ _updateRefreshesStats(isIncremental, true);
-CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
- : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+ Timer t{};
+ try {
+ auto lookupVersion =
+ isIncremental ? existingHistory->optRt->getVersion() : ChunkVersion::UNSHARDED();
-const ShardId& CachedDatabaseInfo::primaryId() const {
- return _dbt.getPrimary();
+ LOGV2_FOR_CATALOG_REFRESH(4619900,
+ 1,
+ "Refreshing cached collection",
+ "namespace"_attr = nss,
+ "currentVersion"_attr = previousVersion);
+
+ auto collectionAndChunks = _catalogCacheLoader.getChunksSince(nss, lookupVersion).get();
+
+ auto newRoutingHistory = [&] {
+ // If we have routing info already and it's for the same collection epoch, we're
+ // updating. Otherwise, we're making a whole new routing table.
+ if (isIncremental &&
+ existingHistory->optRt->getVersion().epoch() == collectionAndChunks.epoch) {
+ return existingHistory->optRt->makeUpdated(collectionAndChunks.reshardingFields,
+ collectionAndChunks.changedChunks);
+ }
+
+ auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
+ if (!collectionAndChunks.defaultCollation.isEmpty()) {
+ // The collation should have been validated upon collection creation
+ return uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collectionAndChunks.defaultCollation));
+ }
+ return nullptr;
+ }();
+
+ return RoutingTableHistory::makeNew(nss,
+ collectionAndChunks.uuid,
+ KeyPattern(collectionAndChunks.shardKeyPattern),
+ std::move(defaultCollator),
+ collectionAndChunks.shardKeyIsUnique,
+ collectionAndChunks.epoch,
+ std::move(collectionAndChunks.reshardingFields),
+ collectionAndChunks.changedChunks);
+ }();
+
+ newRoutingHistory.setAllShardsRefreshed();
+
+ // Check that the shards all match with what is on the config server
+ std::set<ShardId> shardIds;
+ newRoutingHistory.getAllShardIds(&shardIds);
+ for (const auto& shardId : shardIds) {
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
+ }
+
+ const auto newVersion =
+ ComparableChunkVersion::makeComparableChunkVersion(newRoutingHistory.getVersion());
+
+ LOGV2_FOR_CATALOG_REFRESH(4619901,
+ isIncremental || newVersion != previousVersion ? 0 : 1,
+ "Refreshed cached collection",
+ "namespace"_attr = nss,
+ "newVersion"_attr = newVersion,
+ "oldVersion"_attr = previousVersion,
+ "duration"_attr = Milliseconds(t.millis()));
+ _updateRefreshesStats(isIncremental, false);
+
+ return LookupResult(OptionalRoutingTableHistory(std::move(newRoutingHistory)), newVersion);
+ } catch (const DBException& ex) {
+ _stats.countFailedRefreshes.addAndFetch(1);
+ _updateRefreshesStats(isIncremental, false);
+
+ if (ex.code() == ErrorCodes::NamespaceNotFound) {
+ LOGV2_FOR_CATALOG_REFRESH(4619902,
+ 0,
+ "Collection has found to be unsharded after refresh",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()));
+
+ return LookupResult(
+ OptionalRoutingTableHistory(),
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()));
+ }
+
+ LOGV2_FOR_CATALOG_REFRESH(4619903,
+ 0,
+ "Error refreshing cached collection",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()),
+ "error"_attr = redact(ex));
+
+ throw;
+ }
}
-bool CachedDatabaseInfo::shardingEnabled() const {
- return _dbt.getSharded();
+AtomicWord<uint64_t> ComparableDatabaseVersion::_uuidDisambiguatingSequenceNumSource{1ULL};
+
+ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
+ const DatabaseVersion& version) {
+ return ComparableDatabaseVersion(version, _uuidDisambiguatingSequenceNumSource.fetchAndAdd(1));
}
-DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
- return _dbt.getVersion();
+std::string ComparableDatabaseVersion::toString() const {
+ return str::stream() << (_dbVersion ? _dbVersion->toBSON().toString() : "NONE") << "|"
+ << _uuidDisambiguatingSequenceNum;
}
-AtomicWord<uint64_t> ComparableChunkVersion::_localSequenceNumSource{1ULL};
+bool ComparableDatabaseVersion::operator==(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return true; // Default constructed value
+ if (_dbVersion.is_initialized() != other._dbVersion.is_initialized())
+ return false; // One side is default constructed value
-ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion(
- const ChunkVersion& version) {
- return ComparableChunkVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+ return sameUuid(other) && (_dbVersion->getLastMod() == other._dbVersion->getLastMod());
}
-const ChunkVersion& ComparableChunkVersion::getVersion() const {
- return _chunkVersion;
+bool ComparableDatabaseVersion::operator<(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return false; // Default constructed value
+
+ if (_dbVersion && other._dbVersion && sameUuid(other)) {
+ return _dbVersion->getLastMod() < other._dbVersion->getLastMod();
+ } else {
+ return _uuidDisambiguatingSequenceNum < other._uuidDisambiguatingSequenceNum;
+ }
}
-uint64_t ComparableChunkVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
+CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
+ : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+
+const ShardId& CachedDatabaseInfo::primaryId() const {
+ return _dbt.getPrimary();
}
-BSONObj ComparableChunkVersion::toBSON() const {
- BSONObjBuilder builder;
- _chunkVersion.appendToCommand(&builder);
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
+bool CachedDatabaseInfo::shardingEnabled() const {
+ return _dbt.getSharded();
}
-std::string ComparableChunkVersion::toString() const {
- return toBSON().toString();
+DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
+ return _dbt.getVersion();
}
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index a957189183a..796b9e10136 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -45,8 +45,6 @@
namespace mongo {
class BSONObjBuilder;
-class CachedDatabaseInfo;
-class OperationContext;
static constexpr int kMaxNumStaleVersionRetries = 10;
@@ -64,21 +62,21 @@ extern const OperationContext::Decoration<bool> operationShouldBlockBehindCatalo
* in fact is impossible to compare two different DatabaseVersion that have different UUIDs.
*
* This class wrap a DatabaseVersion object to make it always comparable by timestamping it with a
- * node-local sequence number (_dbVersionLocalSequence).
+ * node-local sequence number (_uuidDisambiguatingSequenceNum).
*
* This class class should go away once a cluster-wide comparable DatabaseVersion will be
* implemented.
*/
class ComparableDatabaseVersion {
public:
- /*
- * Create a ComparableDatabaseVersion that wraps the given DatabaseVersion.
- * Each object created through this method will have a local sequence number grater then the
+ /**
+ * Creates a ComparableDatabaseVersion that wraps the given DatabaseVersion.
+ * Each object created through this method will have a local sequence number greater than the
* previously created ones.
*/
static ComparableDatabaseVersion makeComparableDatabaseVersion(const DatabaseVersion& version);
- /*
+ /**
* Empty constructor needed by the ReadThroughCache.
*
* Instances created through this constructor will be always less then the ones created through
@@ -86,39 +84,28 @@ public:
*/
ComparableDatabaseVersion() = default;
- const DatabaseVersion& getVersion() const;
-
- uint64_t getLocalSequenceNum() const;
-
- BSONObj toBSON() const;
+ const DatabaseVersion& getVersion() const {
+ return *_dbVersion;
+ }
std::string toString() const;
- // Rerturns true if the two versions have the same UUID
bool sameUuid(const ComparableDatabaseVersion& other) const {
- return _dbVersion.getUuid() == other._dbVersion.getUuid();
+ return _dbVersion->getUuid() == other._dbVersion->getUuid();
}
- bool operator==(const ComparableDatabaseVersion& other) const {
- return sameUuid(other) && (_dbVersion.getLastMod() == other._dbVersion.getLastMod());
- }
+ bool operator==(const ComparableDatabaseVersion& other) const;
bool operator!=(const ComparableDatabaseVersion& other) const {
return !(*this == other);
}
- /*
- * In the case the two compared instances have different UUIDs the most recently created one
- * will be grater, otherwise the comparision will be driven by the lastMod field of the
- * underlying DatabaseVersion.
+ /**
+ * In case the two compared instances have different UUIDs, the most recently created one will
+ * be greater, otherwise the comparison will be driven by the lastMod field of the underlying
+ * DatabaseVersion.
*/
- bool operator<(const ComparableDatabaseVersion& other) const {
- if (sameUuid(other)) {
- return _dbVersion.getLastMod() < other._dbVersion.getLastMod();
- } else {
- return _localSequenceNum < other._localSequenceNum;
- }
- }
+ bool operator<(const ComparableDatabaseVersion& other) const;
bool operator>(const ComparableDatabaseVersion& other) const {
return other < *this;
@@ -133,92 +120,18 @@ public:
}
private:
- static AtomicWord<uint64_t> _localSequenceNumSource;
+ static AtomicWord<uint64_t> _uuidDisambiguatingSequenceNumSource;
+
+ ComparableDatabaseVersion(const DatabaseVersion& version,
+ uint64_t uuidDisambiguatingSequenceNum)
+ : _dbVersion(version), _uuidDisambiguatingSequenceNum(uuidDisambiguatingSequenceNum) {}
- ComparableDatabaseVersion(const DatabaseVersion& version, uint64_t localSequenceNum)
- : _dbVersion(version), _localSequenceNum(localSequenceNum) {}
+ boost::optional<DatabaseVersion> _dbVersion;
- DatabaseVersion _dbVersion;
// Locally incremented sequence number that allows to compare two database versions with
// different UUIDs. Each new comparableDatabaseVersion will have a greater sequence number then
// the ones created before.
- uint64_t _localSequenceNum{0};
-};
-
-/**
- * Constructed to be used exclusively by the CatalogCache as a vector clock (Time) to drive
- * CollectionCache's lookups.
- *
- * The ChunkVersion class contains an non comparable epoch, which makes impossible to compare two
- * ChunkVersions when their epochs's differ.
- *
- * This class wraps a ChunkVersion object with a node-local sequence number (_localSequenceNum) that
- * allows the comparision.
- *
- * This class should go away once a cluster-wide comparable ChunkVersion is implemented.
- */
-class ComparableChunkVersion {
-public:
- static ComparableChunkVersion makeComparableChunkVersion(const ChunkVersion& version);
-
- ComparableChunkVersion() = default;
-
- const ChunkVersion& getVersion() const;
-
- uint64_t getLocalSequenceNum() const;
-
- BSONObj toBSON() const;
-
- std::string toString() const;
-
- bool sameEpoch(const ComparableChunkVersion& other) const {
- return _chunkVersion.epoch() == other._chunkVersion.epoch();
- }
-
- bool operator==(const ComparableChunkVersion& other) const {
- return sameEpoch(other) &&
- (_chunkVersion.majorVersion() == other._chunkVersion.majorVersion() &&
- _chunkVersion.minorVersion() == other._chunkVersion.minorVersion());
- }
-
- bool operator!=(const ComparableChunkVersion& other) const {
- return !(*this == other);
- }
-
- bool operator<(const ComparableChunkVersion& other) const {
- if (sameEpoch(other)) {
- return _chunkVersion.majorVersion() < other._chunkVersion.majorVersion() ||
- (_chunkVersion.majorVersion() == other._chunkVersion.majorVersion() &&
- _chunkVersion.minorVersion() < other._chunkVersion.minorVersion());
- } else {
- return _localSequenceNum < other._localSequenceNum;
- }
- }
-
- bool operator>(const ComparableChunkVersion& other) const {
- return other < *this;
- }
-
- bool operator<=(const ComparableChunkVersion& other) const {
- return !(*this > other);
- }
-
- bool operator>=(const ComparableChunkVersion& other) const {
- return !(*this < other);
- }
-
-private:
- static AtomicWord<uint64_t> _localSequenceNumSource;
-
- ComparableChunkVersion(const ChunkVersion& version, uint64_t localSequenceNum)
- : _chunkVersion(version), _localSequenceNum(localSequenceNum) {}
-
- ChunkVersion _chunkVersion;
-
- // Locally incremented sequence number that allows to compare two colection versions with
- // different epochs. Each new comparableChunkVersion will have a greater sequence number than
- // the ones created before.
- uint64_t _localSequenceNum{0};
+ uint64_t _uuidDisambiguatingSequenceNum{0};
};
/**
@@ -298,21 +211,9 @@ public:
/**
* Same as getCollectionRoutingInfo above, but in addition causes the namespace to be refreshed.
- *
- * When forceRefreshFromThisThread is false, it's possible for this call to
- * join an ongoing refresh from another thread forceRefreshFromThisThread.
- * forceRefreshFromThisThread checks whether it joined another thread and
- * then forces it to try again, which is necessary in cases where calls to
- * getCollectionRoutingInfoWithRefresh must be causally consistent
- *
- * TODO: Remove this parameter in favor of using collection creation time +
- * collection version to decide when a refresh is necessary and provide
- * proper causal consistency
*/
- StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh(
- OperationContext* opCtx,
- const NamespaceString& nss,
- bool forceRefreshFromThisThread = false);
+ StatusWith<ChunkManager> getCollectionRoutingInfoWithRefresh(OperationContext* opCtx,
+ const NamespaceString& nss);
/**
* Same as getCollectionRoutingInfoWithRefresh above, but in addition returns a
@@ -333,11 +234,6 @@ public:
const boost::optional<DatabaseVersion>& wantedVersion);
/**
- * Gets whether this operation should block behind a catalog cache refresh.
- */
- static bool getOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx);
-
- /**
* Sets whether this operation should block behind a catalog cache refresh.
*/
static void setOperationShouldBlockBehindCatalogCacheRefresh(OperationContext* opCtx,
@@ -349,18 +245,9 @@ public:
* requests to block on an upcoming catalog cache refresh.
*/
void invalidateShardOrEntireCollectionEntryForShardedCollection(
- OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> wantedVersion,
- const ChunkVersion& receivedVersion,
- ShardId shardId);
-
- /**
- * Non-blocking method that marks the current collection entry for the namespace as needing
- * refresh due to an epoch change. Will cause all further targetting attempts for this
- * namespace to block on a catalog cache refresh.
- */
- void onEpochChange(const NamespaceString& nss);
+ const boost::optional<ChunkVersion>& wantedVersion,
+ const ShardId& shardId);
/**
* Throws a StaleConfigException if this catalog cache does not have an entry for the given
@@ -370,16 +257,8 @@ public:
* version to throw a StaleConfigException.
*/
void checkEpochOrThrow(const NamespaceString& nss,
- ChunkVersion targetCollectionVersion,
- const ShardId& shardId) const;
-
- /**
- * Non-blocking method, which invalidates the shard for the routing table for the specified
- * namespace. If that shard is targetted in the future, getCollectionRoutingInfo will wait on a
- * refresh.
- */
- void invalidateShardForShardedCollection(const NamespaceString& nss,
- const ShardId& staleShardId);
+ const ChunkVersion& targetCollectionVersion,
+ const ShardId& shardId);
/**
* Non-blocking method, which invalidates all namespaces which contain data on the specified
@@ -388,12 +267,6 @@ public:
void invalidateEntriesThatReferenceShard(const ShardId& shardId);
/**
- * Non-blocking method, which removes the entire specified collection from the cache (resulting
- * in full refresh on subsequent access)
- */
- void purgeCollection(const NamespaceString& nss);
-
- /**
* Non-blocking method, which removes the entire specified database (including its collections)
* from the cache.
*/
@@ -416,35 +289,17 @@ public:
*/
void checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, mongo::LogicalOp opType);
+ /**
+ * Non-blocking method that marks the current collection entry for the namespace as needing
+ * refresh. Will cause all further targetting attempts to block on a catalog cache refresh,
+ * even if they do not require causal consistency.
+ */
+ void invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss);
+
private:
// Make the cache entries friends so they can access the private classes below
friend class CachedDatabaseInfo;
- /**
- * Cache entry describing a collection.
- */
- struct CollectionRoutingInfoEntry {
- CollectionRoutingInfoEntry() = default;
- // Disable copy (and move) semantics
- CollectionRoutingInfoEntry(const CollectionRoutingInfoEntry&) = delete;
- CollectionRoutingInfoEntry& operator=(const CollectionRoutingInfoEntry&) = delete;
-
- // Specifies whether this cache entry needs a refresh (in which case routingInfo should not
- // be relied on) or it doesn't, in which case there should be a non-null routingInfo.
- bool needsRefresh{true};
-
- // Specifies whether the namespace has had an epoch change, which indicates that every
- // shard should block on an upcoming refresh.
- bool epochHasChanged{true};
-
- // Contains a notification to be waited on for the refresh to complete (only available if
- // needsRefresh is true)
- std::shared_ptr<Notification<Status>> refreshCompletionNotification;
-
- // Contains the cached routing information (only available if needsRefresh is false)
- std::shared_ptr<RoutingTableHistory> routingInfo;
- };
-
class DatabaseCache
: public ReadThroughCache<std::string, DatabaseType, ComparableDatabaseVersion> {
public:
@@ -461,88 +316,54 @@ private:
Mutex _mutex = MONGO_MAKE_LATCH("DatabaseCache::_mutex");
};
- /**
- * Non-blocking call which schedules an asynchronous refresh for the specified namespace. The
- * namespace must be in the 'needRefresh' state.
- */
- void _scheduleCollectionRefresh(WithLock,
- ServiceContext* service,
- std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
- NamespaceString const& nss,
- int refreshAttempt);
+ class CollectionCache : public RoutingTableHistoryCache {
+ public:
+ CollectionCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ CatalogCacheLoader& catalogCacheLoader);
- /**
- * Marks a collection entry as needing refresh. Will create the collection entry if one does
- * not exist. Also marks the epoch as changed, which will cause all further targetting requests
- * against this namespace to block upon a catalog cache refresh.
- */
- void _createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss);
+ void reportStats(BSONObjBuilder* builder) const;
- /**
- * Marks a collection entry as needing refresh. Will create the collection entry if one does
- * not exist. Will mark the given shard ID as stale, which will cause all further targetting
- * requests for the given shard for this namespace to block upon a catalog cache refresh.
- */
- void _createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss,
- const ShardId& shardId);
+ private:
+ LookupResult _lookupCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ValueHandle& collectionHistory,
+ const ComparableChunkVersion& previousChunkVersion);
- /**
- * Marks a collection entry as needing refresh. Will create the collection entry if one does
- * not exist.
- */
- void _createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss);
+ CatalogCacheLoader& _catalogCacheLoader;
+ Mutex _mutex = MONGO_MAKE_LATCH("CollectionCache::_mutex");
- /**
- * Retrieves the collection entry for the given namespace, creating the entry if one does not
- * already exist.
- */
- std::shared_ptr<CollectionRoutingInfoEntry> _createOrGetCollectionEntry(
- WithLock wl, const NamespaceString& nss);
+ struct Stats {
+ // Tracks how many incremental refreshes are waiting to complete currently
+ AtomicWord<long long> numActiveIncrementalRefreshes{0};
- /**
- * Used as a flag to indicate whether or not this thread performed its own
- * refresh for certain helper functions
- *
- * kPerformedRefresh is used only when the calling thread performed the
- * refresh *itself*
- *
- * kDidNotPerformRefresh is used either when there was an error or when
- * this thread joined an ongoing refresh
- */
- enum class RefreshAction {
- kPerformedRefresh,
- kDidNotPerformRefresh,
- };
+ // Cumulative, always-increasing counter of how many incremental refreshes have been
+ // kicked off
+ AtomicWord<long long> countIncrementalRefreshesStarted{0};
- /**
- * Return type for helper functions performing refreshes so that they can
- * indicate both status and whether or not this thread performed its own
- * refresh
- */
- struct RefreshResult {
- // Status containing result of refresh
- StatusWith<ChunkManager> statusWithInfo;
- RefreshAction actionTaken;
- };
+ // Tracks how many full refreshes are waiting to complete currently
+ AtomicWord<long long> numActiveFullRefreshes{0};
- /**
- * Retrieves the collection routing info for this namespace after blocking on a catalog cache
- * refresh.
- */
- CatalogCache::RefreshResult _getCollectionRoutingInfoWithForcedRefresh(
- OperationContext* opctx, const NamespaceString& nss);
+ // Cumulative, always-increasing counter of how many full refreshes have been kicked off
+ AtomicWord<long long> countFullRefreshesStarted{0};
- /**
- * Helper function used when we need the refresh action taken (e.g. when we
- * want to force refresh)
- */
- CatalogCache::RefreshResult _getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss);
+ // Cumulative, always-increasing counter of how many full or incremental refreshes
+ // failed for whatever reason
+ AtomicWord<long long> countFailedRefreshes{0};
- CatalogCache::RefreshResult _getCollectionRoutingInfoAt(
- OperationContext* opCtx,
- const NamespaceString& nss,
- boost::optional<Timestamp> atClusterTime);
+ /**
+ * Reports the accumulated statistics for serverStatus.
+ */
+ void report(BSONObjBuilder* builder) const;
+
+ } _stats;
+
+ void _updateRefreshesStats(const bool isIncremental, const bool add);
+ };
+
+ StatusWith<ChunkManager> _getCollectionRoutingInfoAt(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<Timestamp> atClusterTime);
// Interface from which chunks will be retrieved
CatalogCacheLoader& _cacheLoader;
@@ -557,23 +378,6 @@ private:
// combined
AtomicWord<long long> totalRefreshWaitTimeMicros{0};
- // Tracks how many incremental refreshes are waiting to complete currently
- AtomicWord<long long> numActiveIncrementalRefreshes{0};
-
- // Cumulative, always-increasing counter of how many incremental refreshes have been kicked
- // off
- AtomicWord<long long> countIncrementalRefreshesStarted{0};
-
- // Tracks how many full refreshes are waiting to complete currently
- AtomicWord<long long> numActiveFullRefreshes{0};
-
- // Cumulative, always-increasing counter of how many full refreshes have been kicked off
- AtomicWord<long long> countFullRefreshesStarted{0};
-
- // Cumulative, always-increasing counter of how many full or incremental refreshes failed
- // for whatever reason
- AtomicWord<long long> countFailedRefreshes{0};
-
// Cumulative, always-increasing counter of how many operations have been blocked by a
// catalog cache refresh. Broken down by operation type to match the operations tracked
// by the OpCounters class.
@@ -595,15 +399,9 @@ private:
std::shared_ptr<ThreadPool> _executor;
-
DatabaseCache _databaseCache;
- // Mutex to serialize access to the collection cache
- mutable Mutex _mutex = MONGO_MAKE_LATCH("CatalogCache::_mutex");
- // Map from full collection name to the routing info for that collection, grouped by database
- using CollectionInfoMap = StringMap<std::shared_ptr<CollectionRoutingInfoEntry>>;
- using CollectionsByDbMap = StringMap<CollectionInfoMap>;
- CollectionsByDbMap _collectionsByDb;
+ CollectionCache _collectionCache;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache_refresh_test.cpp b/src/mongo/s/catalog_cache_refresh_test.cpp
index 70b56845eb1..1e21135a15b 100644
--- a/src/mongo/s/catalog_cache_refresh_test.cpp
+++ b/src/mongo/s/catalog_cache_refresh_test.cpp
@@ -440,7 +440,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithLowestVersion) {
ASSERT_EQ(1, initialRoutingInfo.numChunks());
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
const auto incompleteChunks = [&]() {
ChunkVersion version(1, 0, epoch);
@@ -497,7 +497,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadMissingChunkWithHighestVersion) {
ASSERT_EQ(1, initialRoutingInfo.numChunks());
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
const auto incompleteChunks = [&]() {
ChunkVersion version(1, 0, epoch);
@@ -551,7 +551,7 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoad) {
auto initialRoutingInfo(makeChunkManager(kNss, shardKeyPattern, nullptr, true, {}));
ASSERT_EQ(1, initialRoutingInfo.numChunks());
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion version = initialRoutingInfo.getVersion();
@@ -598,7 +598,7 @@ TEST_F(CatalogCacheRefreshTest, ChunkEpochChangeDuringIncrementalLoadRecoveryAft
setupNShards(2);
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion oldVersion = initialRoutingInfo.getVersion();
const OID newEpoch = OID::gen();
@@ -683,7 +683,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterCollectionEpochChange) {
setupNShards(2);
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion newVersion(1, 0, OID::gen());
@@ -730,7 +730,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterSplit) {
ChunkVersion version = initialRoutingInfo.getVersion();
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
expectGetCollection(version.epoch(), shardKeyPattern);
@@ -776,7 +776,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveWithReshardingFieldsAdde
ChunkVersion version = initialRoutingInfo.getVersion();
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
ChunkVersion expectedDestShardVersion;
@@ -824,7 +824,7 @@ TEST_F(CatalogCacheRefreshTest, IncrementalLoadAfterMoveLastChunkWithReshardingF
ChunkVersion version = initialRoutingInfo.getVersion();
- auto future = scheduleRoutingInfoForcedRefresh(kNss);
+ auto future = scheduleRoutingInfoIncrementalRefresh(kNss);
// The collection type won't have resharding fields this time.
expectGetCollection(version.epoch(), shardKeyPattern);
diff --git a/src/mongo/s/catalog_cache_test.cpp b/src/mongo/s/catalog_cache_test.cpp
index fce177bdd4f..8fdb461aca3 100644
--- a/src/mongo/s/catalog_cache_test.cpp
+++ b/src/mongo/s/catalog_cache_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/catalog_cache_loader_mock.h"
#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/stale_exception.h"
namespace mongo {
namespace {
@@ -72,7 +73,54 @@ protected:
_catalogCacheLoader->setDatabaseRefreshReturnValue(kErrorStatus);
}
+ void loadCollection(const ChunkVersion& version) {
+ const auto coll = makeCollectionType(version);
+ _catalogCacheLoader->setCollectionRefreshReturnValue(coll);
+ _catalogCacheLoader->setChunkRefreshReturnValue(makeChunks(version));
+
+ const auto swChunkManager =
+ _catalogCache->getCollectionRoutingInfo(operationContext(), coll.getNs());
+ ASSERT_OK(swChunkManager.getStatus());
+
+ // Reset the loader return values to avoid false positive results
+ _catalogCacheLoader->setCollectionRefreshReturnValue(kErrorStatus);
+ _catalogCacheLoader->setChunkRefreshReturnValue(kErrorStatus);
+ }
+
+ void loadUnshardedCollection(const NamespaceString& nss) {
+ _catalogCacheLoader->setCollectionRefreshReturnValue(
+ Status(ErrorCodes::NamespaceNotFound, "collection not found"));
+
+ const auto swChunkManager =
+ _catalogCache->getCollectionRoutingInfo(operationContext(), nss);
+ ASSERT_OK(swChunkManager.getStatus());
+
+ // Reset the loader return value to avoid false positive results
+ _catalogCacheLoader->setCollectionRefreshReturnValue(kErrorStatus);
+ }
+
+ std::vector<ChunkType> makeChunks(ChunkVersion version) {
+ ChunkType chunk(kNss,
+ {kShardKeyPattern.getKeyPattern().globalMin(),
+ kShardKeyPattern.getKeyPattern().globalMax()},
+ version,
+ {"0"});
+ chunk.setName(OID::gen());
+ return {chunk};
+ }
+
+ CollectionType makeCollectionType(const ChunkVersion& collVersion) {
+ CollectionType coll;
+ coll.setNs(kNss);
+ coll.setEpoch(collVersion.epoch());
+ coll.setKeyPattern(kShardKeyPattern.getKeyPattern());
+ coll.setUnique(false);
+ return coll;
+ }
+
const NamespaceString kNss{"catalgoCacheTestDB.foo"};
+ const std::string kPattern{"_id"};
+ const ShardKeyPattern kShardKeyPattern{BSON(kPattern << 1)};
const int kDummyPort{12345};
const HostAndPort kConfigHostAndPort{"DummyConfig", kDummyPort};
const std::vector<ShardId> kShards{{"0"}, {"1"}};
@@ -129,5 +177,86 @@ TEST_F(CatalogCacheTest, InvalidateSingleDbOnShardRemoval) {
ASSERT_EQ(cachedDb.primaryId(), kShards[1]);
}
+TEST_F(CatalogCacheTest, CheckEpochNoDatabase) {
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+ ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), collVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ ASSERT(staleInfo->getVersionWanted() == boost::none);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochNoCollection) {
+ const auto dbVersion = DatabaseVersion();
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), collVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ ASSERT(staleInfo->getVersionWanted() == boost::none);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochUnshardedCollection) {
+ const auto dbVersion = DatabaseVersion();
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ loadUnshardedCollection(kNss);
+ ASSERT_THROWS_WITH_CHECK(_catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), collVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ ASSERT(staleInfo->getVersionWanted() == boost::none);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochWithMismatch) {
+ const auto dbVersion = DatabaseVersion();
+ const auto wantedCollVersion = ChunkVersion(1, 0, OID::gen());
+ const auto receivedCollVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ loadCollection(wantedCollVersion);
+
+ ASSERT_THROWS_WITH_CHECK(
+ _catalogCache->checkEpochOrThrow(kNss, receivedCollVersion, kShards[0]),
+ StaleConfigException,
+ [&](const StaleConfigException& ex) {
+ const auto staleInfo = ex.extraInfo<StaleConfigInfo>();
+ ASSERT(staleInfo);
+ ASSERT_EQ(staleInfo->getNss(), kNss);
+ ASSERT_EQ(staleInfo->getVersionReceived(), receivedCollVersion);
+ ASSERT(staleInfo->getVersionWanted() != boost::none);
+ ASSERT_EQ(*(staleInfo->getVersionWanted()), wantedCollVersion);
+ ASSERT_EQ(staleInfo->getShardId(), kShards[0]);
+ });
+}
+
+TEST_F(CatalogCacheTest, CheckEpochWithMatch) {
+ const auto dbVersion = DatabaseVersion();
+ const auto collVersion = ChunkVersion(1, 0, OID::gen());
+
+ loadDatabases({DatabaseType(kNss.db().toString(), kShards[0], true, dbVersion)});
+ loadCollection(collVersion);
+
+ _catalogCache->checkEpochOrThrow(kNss, collVersion, kShards[0]);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp
index 71e02e67fac..4f59eeaef8a 100644
--- a/src/mongo/s/catalog_cache_test_fixture.cpp
+++ b/src/mongo/s/catalog_cache_test_fixture.cpp
@@ -81,6 +81,26 @@ CatalogCacheTestFixture::scheduleRoutingInfoUnforcedRefresh(const NamespaceStrin
});
}
+executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>>
+CatalogCacheTestFixture::scheduleRoutingInfoIncrementalRefresh(const NamespaceString& nss) {
+ auto catalogCache = Grid::get(getServiceContext())->catalogCache();
+ const auto cm =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss));
+ ASSERT(cm.isSharded());
+
+ // Simulates the shard wanting a higher version than the one sent by the router.
+ catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, cm.dbPrimary());
+
+ return launchAsync([this, nss] {
+ auto client = getServiceContext()->makeClient("Test");
+ auto const catalogCache = Grid::get(getServiceContext())->catalogCache();
+
+ return boost::make_optional(
+ uassertStatusOK(catalogCache->getCollectionRoutingInfo(operationContext(), nss)));
+ });
+}
+
std::vector<ShardType> CatalogCacheTestFixture::setupNShards(int numShards) {
std::vector<ShardType> shards;
for (int i = 0; i < numShards; i++) {
diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h
index fb5238a2ba9..3d58f6a8557 100644
--- a/src/mongo/s/catalog_cache_test_fixture.h
+++ b/src/mongo/s/catalog_cache_test_fixture.h
@@ -84,6 +84,17 @@ protected:
scheduleRoutingInfoUnforcedRefresh(const NamespaceString& nss);
/**
+ * Advance the time in the cache for 'kNss' and schedules a thread to make an incremental
+ * refresh.
+ *
+ * NOTE: The returned value is always set. The reason to use optional is a deficiency of
+ * std::future with the MSVC STL library, which requires the templated type to be default
+ * constructible.
+ */
+ executor::NetworkTestEnv::FutureHandle<boost::optional<ChunkManager>>
+ scheduleRoutingInfoIncrementalRefresh(const NamespaceString& nss);
+
+ /**
* Ensures that there are 'numShards' available in the shard registry. The shard ids are
* generated as "0", "1", etc.
*
diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp
index 5713855e01f..9ded562066c 100644
--- a/src/mongo/s/chunk_manager.cpp
+++ b/src/mongo/s/chunk_manager.cpp
@@ -336,22 +336,23 @@ void RoutingTableHistory::setAllShardsRefreshed() {
}
Chunk ChunkManager::findIntersectingChunk(const BSONObj& shardKey, const BSONObj& collation) const {
- const bool hasSimpleCollation = (collation.isEmpty() && !_rt->getDefaultCollator()) ||
+ const bool hasSimpleCollation = (collation.isEmpty() && !_rt->optRt->getDefaultCollator()) ||
SimpleBSONObjComparator::kInstance.evaluate(collation == CollationSpec::kSimpleSpec);
if (!hasSimpleCollation) {
for (BSONElement elt : shardKey) {
uassert(ErrorCodes::ShardKeyNotFound,
str::stream() << "Cannot target single shard due to collation of key "
- << elt.fieldNameStringData() << " for namespace " << _rt->nss(),
+ << elt.fieldNameStringData() << " for namespace "
+ << _rt->optRt->nss(),
!CollationIndexKey::isCollatableType(elt.type()));
}
}
- auto chunkInfo = _rt->findIntersectingChunk(shardKey);
+ auto chunkInfo = _rt->optRt->findIntersectingChunk(shardKey);
uassert(ErrorCodes::ShardKeyNotFound,
str::stream() << "Cannot target single shard using key " << shardKey
- << " for namespace " << _rt->nss(),
+ << " for namespace " << _rt->optRt->nss(),
chunkInfo && chunkInfo->containsKey(shardKey));
return Chunk(*chunkInfo, _clusterTime);
@@ -361,7 +362,7 @@ bool ChunkManager::keyBelongsToShard(const BSONObj& shardKey, const ShardId& sha
if (shardKey.isEmpty())
return false;
- auto chunkInfo = _rt->findIntersectingChunk(shardKey);
+ auto chunkInfo = _rt->optRt->findIntersectingChunk(shardKey);
if (!chunkInfo)
return false;
@@ -374,7 +375,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
const BSONObj& query,
const BSONObj& collation,
std::set<ShardId>* shardIds) const {
- auto qr = std::make_unique<QueryRequest>(_rt->nss());
+ auto qr = std::make_unique<QueryRequest>(_rt->optRt->nss());
qr->setFilter(query);
if (auto uuid = getUUID())
@@ -382,8 +383,8 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
if (!collation.isEmpty()) {
qr->setCollation(collation);
- } else if (_rt->getDefaultCollator()) {
- auto defaultCollator = _rt->getDefaultCollator();
+ } else if (_rt->optRt->getDefaultCollator()) {
+ auto defaultCollator = _rt->optRt->getDefaultCollator();
qr->setCollation(defaultCollator->getSpec().toBSON());
expCtx->setCollator(defaultCollator->clone());
}
@@ -396,7 +397,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
MatchExpressionParser::kAllowAllSpecialFeatures));
// Fast path for targeting equalities on the shard key.
- auto shardKeyToFind = _rt->getShardKeyPattern().extractShardKeyFromQuery(*cq);
+ auto shardKeyToFind = _rt->optRt->getShardKeyPattern().extractShardKeyFromQuery(*cq);
if (!shardKeyToFind.isEmpty()) {
try {
auto chunk = findIntersectingChunk(shardKeyToFind, collation);
@@ -413,14 +414,14 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
// Query { a : { $gte : 1, $lt : 2 },
// b : { $gte : 3, $lt : 4 } }
// => Bounds { a : [1, 2), b : [3, 4) }
- IndexBounds bounds = getIndexBoundsForQuery(_rt->getShardKeyPattern().toBSON(), *cq);
+ IndexBounds bounds = getIndexBoundsForQuery(_rt->optRt->getShardKeyPattern().toBSON(), *cq);
// Transforms bounds for each shard key field into full shard key ranges
// for example :
// Key { a : 1, b : 1 }
// Bounds { a : [1, 2), b : [3, 4) }
// => Ranges { a : 1, b : 3 } => { a : 2, b : 4 }
- BoundList ranges = _rt->getShardKeyPattern().flattenBounds(bounds);
+ BoundList ranges = _rt->optRt->getShardKeyPattern().flattenBounds(bounds);
for (BoundList::const_iterator it = ranges.begin(); it != ranges.end(); ++it) {
getShardIdsForRange(it->first /*min*/, it->second /*max*/, shardIds);
@@ -430,7 +431,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
// because _shardVersions contains shards with chunks and is built based on the last
// refresh. Therefore, it is possible for _shardVersions to have fewer entries if a shard
// no longer owns chunks when it used to at _clusterTime.
- if (!_clusterTime && shardIds->size() == _rt->_shardVersions.size()) {
+ if (!_clusterTime && shardIds->size() == _rt->optRt->_shardVersions.size()) {
break;
}
}
@@ -439,7 +440,7 @@ void ChunkManager::getShardIdsForQuery(boost::intrusive_ptr<ExpressionContext> e
// For now, we satisfy that assumption by adding a shard with no matches rather than returning
// an empty set of shards.
if (shardIds->empty()) {
- _rt->forEachChunk([&](const std::shared_ptr<ChunkInfo>& chunkInfo) {
+ _rt->optRt->forEachChunk([&](const std::shared_ptr<ChunkInfo>& chunkInfo) {
shardIds->insert(chunkInfo->getShardIdAt(_clusterTime));
return false;
});
@@ -459,7 +460,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
return;
}
- _rt->forEachOverlappingChunk(min, max, true, [&](auto& chunkInfo) {
+ _rt->optRt->forEachOverlappingChunk(min, max, true, [&](auto& chunkInfo) {
shardIds->insert(chunkInfo->getShardIdAt(_clusterTime));
// No need to iterate through the rest of the ranges, because we already know we need to use
@@ -467,7 +468,7 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
// because _shardVersions contains shards with chunks and is built based on the last
// refresh. Therefore, it is possible for _shardVersions to have fewer entries if a shard
// no longer owns chunks when it used to at _clusterTime.
- if (!_clusterTime && shardIds->size() == _rt->_shardVersions.size()) {
+ if (!_clusterTime && shardIds->size() == _rt->optRt->_shardVersions.size()) {
return false;
}
@@ -478,14 +479,15 @@ void ChunkManager::getShardIdsForRange(const BSONObj& min,
bool ChunkManager::rangeOverlapsShard(const ChunkRange& range, const ShardId& shardId) const {
bool overlapFound = false;
- _rt->forEachOverlappingChunk(range.getMin(), range.getMax(), false, [&](auto& chunkInfo) {
- if (chunkInfo->getShardIdAt(_clusterTime) == shardId) {
- overlapFound = true;
- return false;
- }
+ _rt->optRt->forEachOverlappingChunk(
+ range.getMin(), range.getMax(), false, [&](auto& chunkInfo) {
+ if (chunkInfo->getShardIdAt(_clusterTime) == shardId) {
+ overlapFound = true;
+ return false;
+ }
- return true;
- });
+ return true;
+ });
return overlapFound;
}
@@ -494,7 +496,7 @@ boost::optional<Chunk> ChunkManager::getNextChunkOnShard(const BSONObj& shardKey
const ShardId& shardId) const {
boost::optional<Chunk> chunk;
- _rt->forEachChunk(
+ _rt->optRt->forEachChunk(
[&](auto& chunkInfo) {
if (chunkInfo->getShardIdAt(_clusterTime) == shardId) {
chunk.emplace(*chunkInfo, _clusterTime);
@@ -654,7 +656,7 @@ ChunkManager ChunkManager::makeAtTime(const ChunkManager& cm, Timestamp clusterT
}
std::string ChunkManager::toString() const {
- return _rt ? _rt->toString() : "UNSHARDED";
+ return _rt->optRt ? _rt->optRt->toString() : "UNSHARDED";
}
bool RoutingTableHistory::compatibleWith(const RoutingTableHistory& other,
@@ -733,7 +735,7 @@ RoutingTableHistory RoutingTableHistory::makeUpdated(
auto changedChunkInfos = flatten(changedChunks);
auto chunkMap = _chunkMap.createMerged(changedChunkInfos);
- // If at least one diff was applied, the collection's version must have advanced
+ // Only update the same collection.
invariant(getVersion().epoch() == chunkMap.getVersion().epoch());
return RoutingTableHistory(_nss,
@@ -745,4 +747,60 @@ RoutingTableHistory RoutingTableHistory::makeUpdated(
std::move(chunkMap));
}
+AtomicWord<uint64_t> ComparableChunkVersion::_epochDisambiguatingSequenceNumSource{1ULL};
+AtomicWord<uint64_t> ComparableChunkVersion::_forcedRefreshSequenceNumSource{1ULL};
+
+ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion(
+ const ChunkVersion& version) {
+ return ComparableChunkVersion(_forcedRefreshSequenceNumSource.load(),
+ version,
+ _epochDisambiguatingSequenceNumSource.fetchAndAdd(1));
+}
+
+ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh() {
+ return ComparableChunkVersion(_forcedRefreshSequenceNumSource.addAndFetch(2) - 1,
+ boost::none,
+ _epochDisambiguatingSequenceNumSource.fetchAndAdd(1));
+}
+
+std::string ComparableChunkVersion::toString() const {
+ return str::stream() << _forcedRefreshSequenceNum << "|"
+ << (_chunkVersion ? _chunkVersion->toString() : "NONE") << "|"
+ << _epochDisambiguatingSequenceNum;
+}
+
+bool ComparableChunkVersion::operator==(const ComparableChunkVersion& other) const {
+ if (_forcedRefreshSequenceNum == other._forcedRefreshSequenceNum) {
+ if (_forcedRefreshSequenceNum == 0)
+ return true; // Default constructed value
+
+ if (sameEpoch(other)) {
+ if (_chunkVersion->majorVersion() == 0 && other._chunkVersion->majorVersion() == 0) {
+ return _chunkVersion->epoch() == OID();
+ }
+ return _chunkVersion->majorVersion() == other._chunkVersion->majorVersion() &&
+ _chunkVersion->minorVersion() == other._chunkVersion->minorVersion();
+ }
+ }
+ return false;
+}
+
+bool ComparableChunkVersion::operator<(const ComparableChunkVersion& other) const {
+ if (_forcedRefreshSequenceNum < other._forcedRefreshSequenceNum)
+ return true;
+ if (_forcedRefreshSequenceNum > other._forcedRefreshSequenceNum)
+ return false;
+ if (_forcedRefreshSequenceNum == 0)
+ return false; // Default constructed value
+
+ if (sameEpoch(other) && other._chunkVersion->epoch() != OID() &&
+ _chunkVersion->majorVersion() != 0 && other._chunkVersion->majorVersion() != 0) {
+ return _chunkVersion->majorVersion() < other._chunkVersion->majorVersion() ||
+ (_chunkVersion->majorVersion() == other._chunkVersion->majorVersion() &&
+ _chunkVersion->minorVersion() < other._chunkVersion->minorVersion());
+ } else {
+ return _epochDisambiguatingSequenceNum < other._epochDisambiguatingSequenceNum;
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h
index 7f25a810a4a..e694a94c201 100644
--- a/src/mongo/s/chunk_manager.h
+++ b/src/mongo/s/chunk_manager.h
@@ -43,6 +43,7 @@
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/concurrency/ticketholder.h"
+#include "mongo/util/read_through_cache.h"
namespace mongo {
@@ -324,13 +325,128 @@ private:
};
/**
+ * Constructed to be used exclusively by the CatalogCache as a vector clock (Time) to drive
+ * CollectionCache's lookups.
+ *
+ * The ChunkVersion class contains a non comparable epoch, which makes impossible to compare two
+ * ChunkVersions when their epochs's differ.
+ *
+ * This class wraps a ChunkVersion object with a node-local sequence number
+ * (_epochDisambiguatingSequenceNum) that allows the comparision.
+ *
+ * This class should go away once a cluster-wide comparable ChunkVersion is implemented.
+ */
+class ComparableChunkVersion {
+public:
+ /**
+ * Creates a ComparableChunkVersion that wraps the given ChunkVersion.
+ * Each object created through this method will have a local sequence number greater than the
+ * previously created ones.
+ */
+ static ComparableChunkVersion makeComparableChunkVersion(const ChunkVersion& version);
+
+ /**
+ * Creates a ComparableChunkVersion object, which will artificially be greater than any that
+ * were previously created by `makeComparableChunkVersion`. Used as means to cause the
+ * collections cache to attempt a refresh in situations where causal consistency cannot be
+ * inferred.
+ */
+ static ComparableChunkVersion makeComparableChunkVersionForForcedRefresh();
+
+ /**
+ * Empty constructor needed by the ReadThroughCache.
+ *
+ * Instances created through this constructor will be always less then the ones created through
+ * the two static constructors, but they do not carry any meaningful value and can only be used
+ * for comparison purposes.
+ */
+ ComparableChunkVersion() = default;
+
+ const ChunkVersion& getVersion() const {
+ return *_chunkVersion;
+ }
+
+ std::string toString() const;
+
+ bool sameEpoch(const ComparableChunkVersion& other) const {
+ return _chunkVersion->epoch() == other._chunkVersion->epoch();
+ }
+
+ bool operator==(const ComparableChunkVersion& other) const;
+
+ bool operator!=(const ComparableChunkVersion& other) const {
+ return !(*this == other);
+ }
+
+ /**
+ * In case the two compared instances have different epochs, the most recently created one will
+ * be greater, otherwise the comparision will be driven by the major/minor versions of the
+ * underlying ChunkVersion.
+ */
+ bool operator<(const ComparableChunkVersion& other) const;
+
+ bool operator>(const ComparableChunkVersion& other) const {
+ return other < *this;
+ }
+
+ bool operator<=(const ComparableChunkVersion& other) const {
+ return !(*this > other);
+ }
+
+ bool operator>=(const ComparableChunkVersion& other) const {
+ return !(*this < other);
+ }
+
+private:
+ static AtomicWord<uint64_t> _epochDisambiguatingSequenceNumSource;
+ static AtomicWord<uint64_t> _forcedRefreshSequenceNumSource;
+
+ ComparableChunkVersion(uint64_t forcedRefreshSequenceNum,
+ boost::optional<ChunkVersion> version,
+ uint64_t epochDisambiguatingSequenceNum)
+ : _forcedRefreshSequenceNum(forcedRefreshSequenceNum),
+ _chunkVersion(std::move(version)),
+ _epochDisambiguatingSequenceNum(epochDisambiguatingSequenceNum) {}
+
+ uint64_t _forcedRefreshSequenceNum{0};
+
+ boost::optional<ChunkVersion> _chunkVersion;
+
+ // Locally incremented sequence number that allows to compare two colection versions with
+ // different epochs. Each new comparableChunkVersion will have a greater sequence number than
+ // the ones created before.
+ uint64_t _epochDisambiguatingSequenceNum{0};
+};
+
+/**
+ * This intermediate structure is necessary to be able to store UNSHARDED collections in the routing
+ * table history cache below. The reason is that currently the RoutingTableHistory class only
+ * supports sharded collections (i.e., collections which have entries in config.collections and
+ * config.chunks).
+ */
+struct OptionalRoutingTableHistory {
+ // UNSHARDED collection constructor
+ OptionalRoutingTableHistory() = default;
+
+ // SHARDED collection constructor
+ OptionalRoutingTableHistory(RoutingTableHistory&& rt) : optRt(std::move(rt)) {}
+
+ // If boost::none, the collection is UNSHARDED, otherwise it is SHARDED
+ boost::optional<RoutingTableHistory> optRt;
+};
+
+using RoutingTableHistoryCache =
+ ReadThroughCache<NamespaceString, OptionalRoutingTableHistory, ComparableChunkVersion>;
+using RoutingTableHistoryValueHandle = RoutingTableHistoryCache::ValueHandle;
+
+/**
* Wrapper around a RoutingTableHistory, which pins it to a particular point in time.
*/
class ChunkManager {
public:
ChunkManager(ShardId dbPrimary,
DatabaseVersion dbVersion,
- std::shared_ptr<RoutingTableHistory> rt,
+ RoutingTableHistoryValueHandle rt,
boost::optional<Timestamp> clusterTime)
: _dbPrimary(std::move(dbPrimary)),
_dbVersion(std::move(dbVersion)),
@@ -340,7 +456,7 @@ public:
// Methods supported on both sharded and unsharded collections
bool isSharded() const {
- return bool(_rt);
+ return bool(_rt->optRt);
}
const ShardId& dbPrimary() const {
@@ -352,7 +468,7 @@ public:
}
int numChunks() const {
- return _rt ? _rt->numChunks() : 1;
+ return _rt->optRt ? _rt->optRt->numChunks() : 1;
}
std::string toString() const;
@@ -360,32 +476,32 @@ public:
// Methods only supported on sharded collections (caller must check isSharded())
const ShardKeyPattern& getShardKeyPattern() const {
- return _rt->getShardKeyPattern();
+ return _rt->optRt->getShardKeyPattern();
}
const CollatorInterface* getDefaultCollator() const {
- return _rt->getDefaultCollator();
+ return _rt->optRt->getDefaultCollator();
}
bool isUnique() const {
- return _rt->isUnique();
+ return _rt->optRt->isUnique();
}
ChunkVersion getVersion() const {
- return _rt->getVersion();
+ return _rt->optRt->getVersion();
}
ChunkVersion getVersion(const ShardId& shardId) const {
- return _rt->getVersion(shardId);
+ return _rt->optRt->getVersion(shardId);
}
ChunkVersion getVersionForLogging(const ShardId& shardId) const {
- return _rt->getVersionForLogging(shardId);
+ return _rt->optRt->getVersionForLogging(shardId);
}
template <typename Callable>
void forEachChunk(Callable&& handler) const {
- _rt->forEachChunk(
+ _rt->optRt->forEachChunk(
[this, handler = std::forward<Callable>(handler)](const auto& chunkInfo) mutable {
if (!handler(Chunk{*chunkInfo, _clusterTime}))
return false;
@@ -461,14 +577,14 @@ public:
* Returns the ids of all shards on which the collection has any chunks.
*/
void getAllShardIds(std::set<ShardId>* all) const {
- _rt->getAllShardIds(all);
+ _rt->optRt->getAllShardIds(all);
}
/**
* Returns the number of shards on which the collection has any chunks
*/
int getNShardsOwningChunks() const {
- return _rt->getNShardsOwningChunks();
+ return _rt->optRt->getNShardsOwningChunks();
}
// Transforms query into bounds for each field in the shard key
@@ -500,30 +616,30 @@ public:
* Returns true if, for this shard, the chunks are identical in both chunk managers
*/
bool compatibleWith(const ChunkManager& other, const ShardId& shard) const {
- return _rt->compatibleWith(*other._rt, shard);
+ return _rt->optRt->compatibleWith(*other._rt->optRt, shard);
}
bool uuidMatches(UUID uuid) const {
- return _rt->uuidMatches(uuid);
+ return _rt->optRt->uuidMatches(uuid);
}
boost::optional<UUID> getUUID() const {
- return _rt->getUUID();
+ return _rt->optRt->getUUID();
}
const boost::optional<TypeCollectionReshardingFields>& getReshardingFields() const {
- return _rt->getReshardingFields();
+ return _rt->optRt->getReshardingFields();
}
const RoutingTableHistory& getRoutingTableHistory_ForTest() const {
- return *_rt;
+ return *_rt->optRt;
}
private:
ShardId _dbPrimary;
DatabaseVersion _dbVersion;
- std::shared_ptr<RoutingTableHistory> _rt;
+ RoutingTableHistoryValueHandle _rt;
boost::optional<Timestamp> _clusterTime;
};
diff --git a/src/mongo/s/chunk_manager_refresh_bm.cpp b/src/mongo/s/chunk_manager_refresh_bm.cpp
index a3feba2de1e..bd9b133301c 100644
--- a/src/mongo/s/chunk_manager_refresh_bm.cpp
+++ b/src/mongo/s/chunk_manager_refresh_bm.cpp
@@ -43,8 +43,10 @@ namespace {
const NamespaceString kNss("test", "foo");
-std::shared_ptr<RoutingTableHistory> makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
- return std::make_shared<RoutingTableHistory>(std::move(rt));
+RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version));
}
ChunkRange getRangeForChunk(int i, int nChunks) {
@@ -69,6 +71,7 @@ CollectionMetadata makeChunkManagerWithShardSelector(int nShards,
std::vector<ChunkType> chunks;
chunks.reserve(nChunks);
+
for (uint32_t i = 0; i < nChunks; ++i) {
chunks.emplace_back(kNss,
getRangeForChunk(i, nChunks),
@@ -144,13 +147,13 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS
const uint32_t nChunks = state.range(1);
const auto collEpoch = OID::gen();
- const auto collName = NamespaceString("test.foo");
const auto shardKeyPattern = KeyPattern(BSON("_id" << 1));
std::vector<ChunkType> chunks;
chunks.reserve(nChunks);
+
for (uint32_t i = 0; i < nChunks; ++i) {
- chunks.emplace_back(collName,
+ chunks.emplace_back(kNss,
getRangeForChunk(i, nChunks),
ChunkVersion{i + 1, 0, collEpoch},
selectShard(i, nShards, nChunks));
@@ -158,7 +161,7 @@ auto BM_FullBuildOfChunkManager(benchmark::State& state, ShardSelectorFn selectS
for (auto keepRunning : state) {
auto rt = RoutingTableHistory::makeNew(
- collName, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, boost::none, chunks);
+ kNss, UUID::gen(), shardKeyPattern, nullptr, true, collEpoch, boost::none, chunks);
benchmark::DoNotOptimize(
CollectionMetadata(ChunkManager(ShardId("shard0"),
DatabaseVersion(UUID::gen(), 1),
diff --git a/src/mongo/s/commands/cluster_drop_cmd.cpp b/src/mongo/s/commands/cluster_drop_cmd.cpp
index a69e3292597..f727489ccc0 100644
--- a/src/mongo/s/commands/cluster_drop_cmd.cpp
+++ b/src/mongo/s/commands/cluster_drop_cmd.cpp
@@ -88,7 +88,9 @@ public:
// Invalidate the routing table cache entry for this collection so that we reload it the
// next time it is accessed, even if sending the command to the config server fails due
// to e.g. a NetworkError.
- ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); });
+ ON_BLOCK_EXIT([opCtx, nss] {
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss);
+ });
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
diff --git a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
index b4157bee9d9..531aa1ab41e 100644
--- a/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
+++ b/src/mongo/s/commands/cluster_merge_chunks_cmd.cpp
@@ -174,8 +174,10 @@ public:
Shard::RetryPolicy::kNotIdempotent));
uassertStatusOK(response.commandStatus);
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(
- nss, firstChunk.getShardId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, firstChunk.getShardId());
CommandHelpers::filterCommandReplyForPassthrough(response.response, &result);
return true;
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index 01cdb91234e..f6e2d27c80f 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -198,9 +198,14 @@ public:
cmdObj["waitForDelete"].trueValue(),
forceJumbo));
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss,
- chunk->getShardId());
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss, to->getId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, chunk->getShardId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, to->getId());
result.append("millis", t.millis());
return true;
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index d27fd037d30..d4c4d7901ad 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -105,7 +105,9 @@ public:
// Invalidate the routing table cache entry for this collection so that we reload the
// collection the next time it's accessed, even if we receive a failure, e.g. NetworkError.
- ON_BLOCK_EXIT([opCtx, nss] { Grid::get(opCtx)->catalogCache()->onEpochChange(nss); });
+ ON_BLOCK_EXIT([opCtx, nss] {
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(nss);
+ });
auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto cmdResponse = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
diff --git a/src/mongo/s/commands/cluster_split_cmd.cpp b/src/mongo/s/commands/cluster_split_cmd.cpp
index 19d33b3f10b..5532fac1daf 100644
--- a/src/mongo/s/commands/cluster_split_cmd.cpp
+++ b/src/mongo/s/commands/cluster_split_cmd.cpp
@@ -270,8 +270,10 @@ public:
ChunkRange(chunk->getMin(), chunk->getMax()),
{splitPoint}));
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(nss,
- chunk->getShardId());
+ Grid::get(opCtx)
+ ->catalogCache()
+ ->invalidateShardOrEntireCollectionEntryForShardedCollection(
+ nss, boost::none, chunk->getShardId());
return true;
}
diff --git a/src/mongo/s/commands/flush_router_config_cmd.cpp b/src/mongo/s/commands/flush_router_config_cmd.cpp
index bcc61a82a0a..d27b65a2c4d 100644
--- a/src/mongo/s/commands/flush_router_config_cmd.cpp
+++ b/src/mongo/s/commands/flush_router_config_cmd.cpp
@@ -102,7 +102,7 @@ public:
"Routing metadata flushed for collection {namespace}",
"Routing metadata flushed for collection",
"namespace"_attr = nss);
- catalogCache->purgeCollection(nss);
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(nss);
}
}
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 644c10e6bcb..f83b490d0ef 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -722,16 +722,12 @@ void runCommand(OperationContext* opCtx,
auto catalogCache = Grid::get(opCtx)->catalogCache();
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- staleNs,
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
// If we don't have the stale config info and therefore don't know the shard's
// id, we have to force all further targetting requests for the namespace to
// block on a refresh.
- catalogCache->onEpochChange(staleNs);
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(staleNs);
}
@@ -1301,16 +1297,12 @@ void Strategy::explainFind(OperationContext* opCtx,
Grid::get(opCtx)
->catalogCache()
->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- staleNs,
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ staleNs, staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
// If we don't have the stale config info and therefore don't know the shard's id,
// we have to force all further targetting requests for the namespace to block on
// a refresh.
- Grid::get(opCtx)->catalogCache()->onEpochChange(staleNs);
+ Grid::get(opCtx)->catalogCache()->invalidateCollectionEntry_LINEARIZABLE(staleNs);
}
if (canRetry) {
diff --git a/src/mongo/s/comparable_chunk_version_test.cpp b/src/mongo/s/comparable_chunk_version_test.cpp
index 941d9bad080..8c1fa71fce2 100644
--- a/src/mongo/s/comparable_chunk_version_test.cpp
+++ b/src/mongo/s/comparable_chunk_version_test.cpp
@@ -29,8 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_version.h"
+#include "mongo/s/chunk_manager.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -95,9 +94,15 @@ TEST(ComparableChunkVersionTest, VersionLessSameEpoch) {
ASSERT_FALSE(version2 > version3);
}
+TEST(ComparableChunkVersionTest, DefaultConstructedVersionsAreEqual) {
+ const ComparableChunkVersion defaultVersion1{}, defaultVersion2{};
+ ASSERT(defaultVersion1 == defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 < defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 > defaultVersion2);
+}
+
TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLess) {
const ComparableChunkVersion defaultVersion{};
- ASSERT_EQ(defaultVersion.getLocalSequenceNum(), 0);
const auto version1 =
ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
ASSERT(defaultVersion != version1);
@@ -105,5 +110,127 @@ TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLess) {
ASSERT_FALSE(defaultVersion > version1);
}
+TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLessThanUnsharded) {
+ const ComparableChunkVersion defaultVersion{};
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ ASSERT(defaultVersion != version1);
+ ASSERT(defaultVersion < version1);
+ ASSERT_FALSE(defaultVersion > version1);
+}
+
+TEST(ComparableChunkVersionTest, DefaultConstructedVersionIsAlwaysLessThanDropped) {
+ const ComparableChunkVersion defaultVersion{};
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED());
+ ASSERT(defaultVersion != version1);
+ ASSERT(defaultVersion < version1);
+ ASSERT_FALSE(defaultVersion > version1);
+}
+
+TEST(ComparableChunkVersionTest, UnshardedAndDroppedAreEqual) {
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ const auto version2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED());
+ const auto version3 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ const auto version4 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::DROPPED());
+ ASSERT(version1 == version2);
+ ASSERT(version1 == version3);
+ ASSERT(version2 == version4);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksAreDifferent) {
+ const auto oid = OID::gen();
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+ const auto version2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+ ASSERT(version1 != version2);
+ ASSERT(version1 < version2);
+ ASSERT_FALSE(version1 > version2);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksCompareBySequenceNum) {
+ const auto oid = OID::gen();
+ const auto version1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(1, 0, oid));
+ const auto noChunkSV1 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+
+ ASSERT(version1 != noChunkSV1);
+ ASSERT(noChunkSV1 > version1);
+
+ const auto noChunkSV2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, oid));
+
+ ASSERT(noChunkSV1 != noChunkSV2);
+ ASSERT_FALSE(noChunkSV1 > noChunkSV2);
+ ASSERT(noChunkSV2 > noChunkSV1);
+
+ const auto version2 =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(2, 0, oid));
+
+ ASSERT(version2 != noChunkSV2);
+ ASSERT(version2 > noChunkSV2);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksGreaterThanUnshardedBySequenceNum) {
+ const auto unsharded =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+ const auto noChunkSV =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
+
+ ASSERT(noChunkSV != unsharded);
+ ASSERT(noChunkSV > unsharded);
+}
+
+TEST(ComparableChunkVersionTest, UnshardedGreaterThanNoChunksBySequenceNum) {
+ const auto noChunkSV =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
+ const auto unsharded =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED());
+
+ ASSERT(noChunkSV != unsharded);
+ ASSERT(unsharded > noChunkSV);
+}
+
+TEST(ComparableChunkVersionTest, NoChunksGreaterThanDefault) {
+ const auto noChunkSV =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(0, 0, OID::gen()));
+ const ComparableChunkVersion defaultVersion{};
+
+ ASSERT(noChunkSV != defaultVersion);
+ ASSERT(noChunkSV > defaultVersion);
+}
+
+TEST(ComparableChunkVersionTest, ForcedRefreshSequenceNumber) {
+ auto oid = OID::gen();
+ const ComparableChunkVersion defaultVersionBeforeForce;
+ const auto versionBeforeForce =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(100, 0, oid));
+
+ const auto forcedRefreshVersion =
+ ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh();
+
+ const auto versionAfterForce =
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion(100, 0, oid));
+ const ComparableChunkVersion defaultVersionAfterForce;
+
+ ASSERT(defaultVersionBeforeForce != forcedRefreshVersion);
+ ASSERT(defaultVersionBeforeForce < forcedRefreshVersion);
+
+ ASSERT(versionBeforeForce != forcedRefreshVersion);
+ ASSERT(versionBeforeForce < forcedRefreshVersion);
+
+ ASSERT(versionAfterForce != forcedRefreshVersion);
+ ASSERT(versionAfterForce > forcedRefreshVersion);
+
+ ASSERT(defaultVersionAfterForce != forcedRefreshVersion);
+ ASSERT(defaultVersionAfterForce < forcedRefreshVersion);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/comparable_database_version_test.cpp b/src/mongo/s/comparable_database_version_test.cpp
index 3b2486a5ebd..d4201d56564 100644
--- a/src/mongo/s/comparable_database_version_test.cpp
+++ b/src/mongo/s/comparable_database_version_test.cpp
@@ -82,9 +82,15 @@ TEST(ComparableDatabaseVersionTest, VersionLessSameUuid) {
ASSERT_FALSE(version1 > version2);
}
+TEST(ComparableDatabaseVersionTest, DefaultConstructedVersionsAreEqual) {
+ const ComparableDatabaseVersion defaultVersion1{}, defaultVersion2{};
+ ASSERT(defaultVersion1 == defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 < defaultVersion2);
+ ASSERT_FALSE(defaultVersion1 > defaultVersion2);
+}
+
TEST(ComparableDatabaseVersionTest, DefaultConstructedVersionIsAlwaysLess) {
const ComparableDatabaseVersion defaultVersion{};
- ASSERT_EQ(defaultVersion.getLocalSequenceNum(), 0);
const auto version1 =
ComparableDatabaseVersion::makeComparableDatabaseVersion(DatabaseVersion(UUID::gen(), 0));
ASSERT(defaultVersion != version1);
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 3996e01c326..12073508642 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -504,18 +504,18 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
// Re-target and re-send the initial find command to the shards until we have established the
// shard version.
for (size_t retries = 1; retries <= kMaxRetries; ++retries) {
- auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss());
- if (routingInfoStatus == ErrorCodes::NamespaceNotFound) {
+ auto swCM = getCollectionRoutingInfoForTxnCmd(opCtx, query.nss());
+ if (swCM == ErrorCodes::NamespaceNotFound) {
// If the database doesn't exist, we successfully return an empty result set without
// creating a cursor.
return CursorId(0);
}
- auto routingInfo = uassertStatusOK(routingInfoStatus);
+ const auto cm = uassertStatusOK(std::move(swCM));
try {
return runQueryWithoutRetrying(
- opCtx, query, readPref, routingInfo, results, partialResultsReturned);
+ opCtx, query, readPref, cm, results, partialResultsReturned);
} catch (ExceptionFor<ErrorCodes::StaleDbVersion>& ex) {
if (retries >= kMaxRetries) {
// Check if there are no retries remaining, so the last received error can be
@@ -577,13 +577,9 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
if (ex.code() != ErrorCodes::ShardInvalidatedForTargeting) {
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx,
- query.nss(),
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ query.nss(), staleInfo->getVersionWanted(), staleInfo->getShardId());
} else {
- catalogCache->onEpochChange(query.nss());
+ catalogCache->invalidateCollectionEntry_LINEARIZABLE(query.nss());
}
}
diff --git a/src/mongo/s/request_types/set_shard_version_request.h b/src/mongo/s/request_types/set_shard_version_request.h
index bfd7385ffae..44cacff0415 100644
--- a/src/mongo/s/request_types/set_shard_version_request.h
+++ b/src/mongo/s/request_types/set_shard_version_request.h
@@ -98,6 +98,7 @@ private:
SetShardVersionRequest();
bool _isAuthoritative{false};
+ // TODO (SERVER-50812) remove this flag that isn't used anymore
bool _forceRefresh{false};
boost::optional<NamespaceString> _nss;
diff --git a/src/mongo/s/sessions_collection_sharded.cpp b/src/mongo/s/sessions_collection_sharded.cpp
index 060c1158dbd..22915bd2c0a 100644
--- a/src/mongo/s/sessions_collection_sharded.cpp
+++ b/src/mongo/s/sessions_collection_sharded.cpp
@@ -123,8 +123,6 @@ void SessionsCollectionSharded::checkSessionsCollectionExists(OperationContext*
const auto cm = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(
opCtx, NamespaceString::kLogicalSessionsNamespace));
-
- uassert(ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist", cm.isSharded());
}
void SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
diff --git a/src/mongo/s/sharding_test_fixture_common.cpp b/src/mongo/s/sharding_test_fixture_common.cpp
index 95dd505687b..2ac936d3977 100644
--- a/src/mongo/s/sharding_test_fixture_common.cpp
+++ b/src/mongo/s/sharding_test_fixture_common.cpp
@@ -47,9 +47,11 @@ ShardingTestFixtureCommon::ShardingTestFixtureCommon() {
ShardingTestFixtureCommon::~ShardingTestFixtureCommon() = default;
-std::shared_ptr<RoutingTableHistory> ShardingTestFixtureCommon::makeStandaloneRoutingTableHistory(
+RoutingTableHistoryValueHandle ShardingTestFixtureCommon::makeStandaloneRoutingTableHistory(
RoutingTableHistory rt) {
- return std::make_shared<RoutingTableHistory>(std::move(rt));
+ const auto version = rt.getVersion();
+ return RoutingTableHistoryValueHandle(
+ std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version));
}
void ShardingTestFixtureCommon::onCommand(NetworkTestEnv::OnCommandFunction func) {
diff --git a/src/mongo/s/sharding_test_fixture_common.h b/src/mongo/s/sharding_test_fixture_common.h
index 0ecbbb30695..52377d7fbc5 100644
--- a/src/mongo/s/sharding_test_fixture_common.h
+++ b/src/mongo/s/sharding_test_fixture_common.h
@@ -55,8 +55,7 @@ public:
* which can be used to pass to ChunkManager for tests, which specifically target the behaviour
* of the ChunkManager.
*/
- static std::shared_ptr<RoutingTableHistory> makeStandaloneRoutingTableHistory(
- RoutingTableHistory rt);
+ static RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt);
protected:
ShardingTestFixtureCommon();
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index f7189efdfe9..6794dabc3ca 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -791,7 +791,7 @@ int ChunkManagerTargeter::getNShardsOwningChunks() const {
void ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) {
uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true));
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss));
_init(opCtx);
}