summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog_cache.cpp
diff options
context:
space:
mode:
authorMarcos José Grillo Ramírez <marcos.grillo@mongodb.com>2020-09-05 15:01:13 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-09 22:27:23 +0000
commit377b8fe43916ff2c4e2ed35cb80548aeb8ba8c8d (patch)
treee2036dd117f4b5f2255faedc1ef02853d879ae2c /src/mongo/s/catalog_cache.cpp
parentf9d4a15397585a8f00ea0afa9864531e1f4ed5fb (diff)
downloadmongo-377b8fe43916ff2c4e2ed35cb80548aeb8ba8c8d.tar.gz
SERVER-46199 Make the collection CatalogCache support causal consistency
This change implements the collection CatalogCache on top of the ReadThroughCache, giving it the ability to support causal consistency. As part of this change, the 'forceRefreshFromThisThread' flag has been removed from the shard's refresh methods. Co-authored-by: Tommaso Tocci <tommaso.tocci@mongodb.com> Co-authored-by: Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> Co-authored-by: Kaloian Manassiev <kaloian.manassiev@mongodb.com>
Diffstat (limited to 'src/mongo/s/catalog_cache.cpp')
-rw-r--r--src/mongo/s/catalog_cache.cpp762
1 files changed, 282 insertions, 480 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 19846e62b48..d9c2500f2d3 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -55,6 +55,7 @@
#include "mongo/util/timer.h"
namespace mongo {
+
const OperationContext::Decoration<bool> operationShouldBlockBehindCatalogCacheRefresh =
OperationContext::declareDecoration<bool>();
@@ -68,81 +69,8 @@ namespace {
const int kMaxInconsistentRoutingInfoRefreshAttempts = 3;
const int kDatabaseCacheSize = 10000;
-/**
- * Returns whether two shard versions have a matching epoch.
- */
-bool shardVersionsHaveMatchingEpoch(boost::optional<ChunkVersion> wanted,
- const ChunkVersion& received) {
- return wanted && wanted->epoch() == received.epoch();
-};
-
-/**
- * Given an (optional) initial routing table and a set of changed chunks returned by the catalog
- * cache loader, produces a new routing table with the changes applied.
- *
- * If the collection is no longer sharded returns nullptr. If the epoch has changed, expects that
- * the 'collectionChunksList' contains the full contents of the chunks collection for that namespace
- * so that the routing table can be built from scratch.
- *
- * Throws ConflictingOperationInProgress if the chunk metadata was found to be inconsistent (not
- * containing all the necessary chunks, contains overlaps or chunks' epoch values are not the same
- * as that of the collection). Since this situation may be transient, due to the collection being
- * dropped or having its shard key refined concurrently, the caller must retry the reload up to some
- * configurable number of attempts.
- */
-std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
- OperationContext* opCtx,
- const NamespaceString& nss,
- std::shared_ptr<RoutingTableHistory> existingRoutingInfo,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollectionAndChangedChunks) {
- if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- return nullptr;
- }
- const auto collectionAndChunks = uassertStatusOK(std::move(swCollectionAndChangedChunks));
-
- auto chunkManager = [&] {
- // If we have routing info already and it's for the same collection epoch, we're updating.
- // Otherwise, we're making a whole new routing table.
- if (existingRoutingInfo &&
- existingRoutingInfo->getVersion().epoch() == collectionAndChunks.epoch) {
- if (collectionAndChunks.changedChunks.size() == 1 &&
- collectionAndChunks.changedChunks[0].getVersion() ==
- existingRoutingInfo->getVersion())
- return existingRoutingInfo;
-
- return std::make_shared<RoutingTableHistory>(
- existingRoutingInfo->makeUpdated(std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }
-
- auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
- if (!collectionAndChunks.defaultCollation.isEmpty()) {
- // The collation should have been validated upon collection creation
- return uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
- ->makeFromBSON(collectionAndChunks.defaultCollation));
- }
- return nullptr;
- }();
-
- return std::make_shared<RoutingTableHistory>(
- RoutingTableHistory::makeNew(nss,
- collectionAndChunks.uuid,
- KeyPattern(collectionAndChunks.shardKeyPattern),
- std::move(defaultCollator),
- collectionAndChunks.shardKeyIsUnique,
- collectionAndChunks.epoch,
- std::move(collectionAndChunks.reshardingFields),
- collectionAndChunks.changedChunks));
- }();
-
- std::set<ShardId> shardIds;
- chunkManager->getAllShardIds(&shardIds);
- for (const auto& shardId : shardIds) {
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- }
- return chunkManager;
-}
+const int kCollectionCacheSize = 10000;
} // namespace
@@ -155,7 +83,8 @@ CatalogCache::CatalogCache(ServiceContext* const service, CatalogCacheLoader& ca
options.maxThreads = 6;
return options;
}())),
- _databaseCache(service, *_executor, _cacheLoader) {
+ _databaseCache(service, *_executor, _cacheLoader),
+ _collectionCache(service, *_executor, _cacheLoader) {
_executor->startup();
}
@@ -190,111 +119,89 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
}
}
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfo(opCtx, nss).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoWithForcedRefresh(
- OperationContext* opCtx, const NamespaceString& nss) {
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
- _createOrGetCollectionEntryAndMarkAsNeedsRefresh(nss);
- return _getCollectionRoutingInfo(opCtx, nss);
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfo(OperationContext* opCtx,
- const NamespaceString& nss) {
- return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
-}
-
-
-StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
- const NamespaceString& nss,
- Timestamp atClusterTime) {
- return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime).statusWithInfo;
-}
-
-CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt(
+StatusWith<ChunkManager> CatalogCache::_getCollectionRoutingInfoAt(
OperationContext* opCtx, const NamespaceString& nss, boost::optional<Timestamp> atClusterTime) {
- invariant(!opCtx->lockState() || !opCtx->lockState()->isLocked(),
- "Do not hold a lock while refreshing the catalog cache. Doing so would potentially "
- "hold the lock during a network call, and can lead to a deadlock as described in "
- "SERVER-37398.");
- // This default value can cause a single unnecessary extra refresh if this thread did do the
- // refresh but the refresh failed, or if the database or collection was not found, but only if
- // the caller is getCollectionRoutingInfoWithRefresh with the parameter
- // forceRefreshFromThisThread set to true
- RefreshAction refreshActionTaken(RefreshAction::kDidNotPerformRefresh);
- while (true) {
+ invariant(
+ !opCtx->lockState() || !opCtx->lockState()->isLocked(),
+ "Do not hold a lock while refreshing the catalog cache. Doing so would potentially hold "
+ "the lock during a network call, and can lead to a deadlock as described in SERVER-37398.");
+
+ try {
const auto swDbInfo = getDatabase(opCtx, nss.db());
+
if (!swDbInfo.isOK()) {
if (swDbInfo == ErrorCodes::NamespaceNotFound) {
LOGV2_FOR_CATALOG_REFRESH(
- 4947102,
+ 4947103,
2,
"Invalidating cached collection entry because its database has been dropped",
"namespace"_attr = nss);
- purgeCollection(nss);
+ invalidateCollectionEntry_LINEARIZABLE(nss);
}
- return {swDbInfo.getStatus(), refreshActionTaken};
+ return swDbInfo.getStatus();
}
const auto dbInfo = std::move(swDbInfo.getValue());
- stdx::unique_lock<Latch> ul(_mutex);
-
- auto collEntry = _createOrGetCollectionEntry(ul, nss);
+ const auto cacheConsistency = gEnableFinerGrainedCatalogCacheRefresh &&
+ !operationShouldBlockBehindCatalogCacheRefresh(opCtx)
+ ? CacheCausalConsistency::kLatestCached
+ : CacheCausalConsistency::kLatestKnown;
- if (collEntry->needsRefresh &&
- (!gEnableFinerGrainedCatalogCacheRefresh || collEntry->epochHasChanged ||
- operationShouldBlockBehindCatalogCacheRefresh(opCtx))) {
+ auto collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
- operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
+ // If the entry is in the cache return inmediately.
+ if (collEntryFuture.isReady()) {
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ collEntryFuture.get(opCtx),
+ atClusterTime);
+ }
- auto refreshNotification = collEntry->refreshCompletionNotification;
- if (!refreshNotification) {
- refreshNotification = (collEntry->refreshCompletionNotification =
- std::make_shared<Notification<Status>>());
- _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1);
- refreshActionTaken = RefreshAction::kPerformedRefresh;
- }
+ operationBlockedBehindCatalogCacheRefresh(opCtx) = true;
- // Wait on the notification outside of the mutex
- ul.unlock();
+ size_t acquireTries = 0;
+ Timer t;
- auto refreshStatus = [&]() {
- Timer t;
- ON_BLOCK_EXIT([&] { _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros()); });
+ while (true) {
+ try {
+ auto collEntry = collEntryFuture.get(opCtx);
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- try {
- const Milliseconds kReportingInterval{250};
- while (!refreshNotification->waitFor(opCtx, kReportingInterval)) {
- _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
- t.reset();
- }
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
- return refreshNotification->get(opCtx);
- } catch (const DBException& ex) {
+ return ChunkManager(dbInfo.primaryId(),
+ dbInfo.databaseVersion(),
+ std::move(collEntry),
+ atClusterTime);
+ } catch (ExceptionFor<ErrorCodes::ConflictingOperationInProgress>& ex) {
+ _stats.totalRefreshWaitTimeMicros.addAndFetch(t.micros());
+ acquireTries++;
+ if (acquireTries == kMaxInconsistentRoutingInfoRefreshAttempts) {
return ex.toStatus();
}
- }();
-
- if (!refreshStatus.isOK()) {
- return {refreshStatus, refreshActionTaken};
}
- // Once the refresh is complete, loop around to get the latest value
- continue;
+ collEntryFuture = _collectionCache.acquireAsync(nss, cacheConsistency);
+ t.reset();
}
-
- return {ChunkManager(dbInfo.primaryId(),
- dbInfo.databaseVersion(),
- collEntry->routingInfo,
- atClusterTime),
- refreshActionTaken};
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
}
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, boost::none);
+}
+
+StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoAt(OperationContext* opCtx,
+ const NamespaceString& nss,
+ Timestamp atClusterTime) {
+ return _getCollectionRoutingInfoAt(opCtx, nss, atClusterTime);
+}
+
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationContext* opCtx,
StringData dbName) {
// TODO SERVER-49724: Make ReadThroughCache support StringData keys
@@ -303,32 +210,20 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabaseWithRefresh(OperationCon
}
StatusWith<ChunkManager> CatalogCache::getCollectionRoutingInfoWithRefresh(
- OperationContext* opCtx, const NamespaceString& nss, bool forceRefreshFromThisThread) {
- auto refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- // We want to ensure that we don't join an in-progress refresh because that
- // could violate causal consistency for this client. We don't need to actually perform the
- // refresh ourselves but we do need the refresh to begin *after* this function is
- // called, so calling it twice is enough regardless of what happens the
- // second time. See SERVER-33954 for reasoning.
- if (forceRefreshFromThisThread &&
- refreshResult.actionTaken == RefreshAction::kDidNotPerformRefresh) {
- refreshResult = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss);
- }
- return refreshResult.statusWithInfo;
+ OperationContext* opCtx, const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, true);
+ return getCollectionRoutingInfo(opCtx, nss);
}
StatusWith<ChunkManager> CatalogCache::getShardedCollectionRoutingInfoWithRefresh(
OperationContext* opCtx, const NamespaceString& nss) {
- auto swRoutingInfo = _getCollectionRoutingInfoWithForcedRefresh(opCtx, nss).statusWithInfo;
- if (!swRoutingInfo.isOK())
- return swRoutingInfo;
-
- auto cri(std::move(swRoutingInfo.getValue()));
- if (!cri.isSharded())
+ auto routingInfoStatus = getCollectionRoutingInfoWithRefresh(opCtx, nss);
+ if (routingInfoStatus.isOK() && !routingInfoStatus.getValue().isSharded()) {
return {ErrorCodes::NamespaceNotSharded,
str::stream() << "Collection " << nss.ns() << " is not sharded."};
-
- return cri;
+ }
+ return routingInfoStatus;
}
void CatalogCache::onStaleDatabaseVersion(const StringData dbName,
@@ -350,48 +245,49 @@ void CatalogCache::setOperationShouldBlockBehindCatalogCacheRefresh(OperationCon
if (gEnableFinerGrainedCatalogCacheRefresh) {
operationShouldBlockBehindCatalogCacheRefresh(opCtx) = shouldBlock;
}
-};
+}
void CatalogCache::invalidateShardOrEntireCollectionEntryForShardedCollection(
- OperationContext* opCtx,
const NamespaceString& nss,
- boost::optional<ChunkVersion> wantedVersion,
- const ChunkVersion& receivedVersion,
- ShardId shardId) {
- if (shardVersionsHaveMatchingEpoch(wantedVersion, receivedVersion)) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, shardId);
- } else {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
+ const boost::optional<ChunkVersion>& wantedVersion,
+ const ShardId& shardId) {
+ _stats.countStaleConfigErrors.addAndFetch(1);
+
+ auto collectionEntry = _collectionCache.peekLatestCached(nss);
+ if (collectionEntry && collectionEntry->optRt) {
+ collectionEntry->optRt->setShardStale(shardId);
}
-};
-void CatalogCache::onEpochChange(const NamespaceString& nss) {
- _createOrGetCollectionEntryAndMarkEpochStale(nss);
-};
+ if (wantedVersion) {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersion(*wantedVersion));
+ } else {
+ _collectionCache.advanceTimeInStore(
+ nss, ComparableChunkVersion::makeComparableChunkVersionForForcedRefresh());
+ }
+}
void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
- ChunkVersion targetCollectionVersion,
- const ShardId& shardId) const {
- stdx::lock_guard<Latch> lg(_mutex);
- const auto itDb = _collectionsByDb.find(nss.db());
+ const ChunkVersion& targetCollectionVersion,
+ const ShardId& shardId) {
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for database " << nss.db(),
- itDb != _collectionsByDb.end());
+ _databaseCache.peekLatestCached(nss.db().toString()));
- auto itColl = itDb->second.find(nss.ns());
+ auto collectionValueHandle = _collectionCache.peekLatestCached(nss);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns()
<< ", no entry for collection.",
- itColl != itDb->second.end());
+ collectionValueHandle);
uassert(StaleConfigInfo(nss, targetCollectionVersion, boost::none, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString()
<< ", but found the collection was unsharded",
- itColl->second->routingInfo);
+ collectionValueHandle->optRt);
- auto foundVersion = itColl->second->routingInfo->getVersion();
+ auto foundVersion = collectionValueHandle->optRt->getVersion();
uassert(StaleConfigInfo(nss, targetCollectionVersion, foundVersion, shardId),
str::stream() << "could not act as router for " << nss.ns() << ", wanted "
<< targetCollectionVersion.toString() << ", but found "
@@ -399,11 +295,6 @@ void CatalogCache::checkEpochOrThrow(const NamespaceString& nss,
foundVersion.epoch() == targetCollectionVersion.epoch());
}
-void CatalogCache::invalidateShardForShardedCollection(const NamespaceString& nss,
- const ShardId& staleShardId) {
- _createOrGetCollectionEntryAndMarkShardStale(nss, staleShardId);
-}
-
void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
LOGV2_DEBUG(4997600,
1,
@@ -413,32 +304,24 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
_databaseCache.invalidateCachedValueIf(
[&](const DatabaseType& dbt) { return dbt.getPrimary() == shardId; });
- stdx::lock_guard<Latch> lg(_mutex);
-
// Invalidate collections which contain data on this shard.
- for (const auto& [db, collInfoMap] : _collectionsByDb) {
- for (const auto& [collNs, collRoutingInfoEntry] : collInfoMap) {
- if (!collRoutingInfoEntry->needsRefresh && collRoutingInfoEntry->routingInfo) {
- // The set of shards on which this collection contains chunks.
- std::set<ShardId> shardsOwningDataForCollection;
- collRoutingInfoEntry->routingInfo->getAllShardIds(&shardsOwningDataForCollection);
-
- if (shardsOwningDataForCollection.find(shardId) !=
- shardsOwningDataForCollection.end()) {
- LOGV2_DEBUG(22647,
- 3,
- "Invalidating cached collection {namespace} that has data "
- "on shard {shardId}",
- "Invalidating cached collection",
- "namespace"_attr = collNs,
- "shardId"_attr = shardId);
-
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->routingInfo->setShardStale(shardId);
- }
- }
- }
- }
+ _collectionCache.invalidateCachedValueIf([&](const OptionalRoutingTableHistory& ort) {
+ if (!ort.optRt)
+ return false;
+ const auto& rt = *ort.optRt;
+
+ std::set<ShardId> shardIds;
+ rt.getAllShardIds(&shardIds);
+
+ LOGV2_DEBUG(22647,
+ 3,
+ "Invalidating cached collection {namespace} that has data "
+ "on shard {shardId}",
+ "Invalidating cached collection",
+ "namespace"_attr = rt.nss(),
+ "shardId"_attr = shardId);
+ return shardIds.find(shardId) != shardIds.end();
+ });
LOGV2(22648,
"Finished invalidating databases and collections with data on shard: {shardId}",
@@ -446,46 +329,28 @@ void CatalogCache::invalidateEntriesThatReferenceShard(const ShardId& shardId) {
"shardId"_attr = shardId);
}
-void CatalogCache::purgeCollection(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- auto itDb = _collectionsByDb.find(nss.db());
- if (itDb == _collectionsByDb.end()) {
- return;
- }
-
- itDb->second.erase(nss.ns());
-}
-
void CatalogCache::purgeDatabase(StringData dbName) {
_databaseCache.invalidate(dbName.toString());
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.erase(dbName);
+ _collectionCache.invalidateKeyIf(
+ [&](const NamespaceString& nss) { return nss.db() == dbName; });
}
void CatalogCache::purgeAllDatabases() {
_databaseCache.invalidateAll();
- stdx::lock_guard<Latch> lg(_mutex);
- _collectionsByDb.clear();
+ _collectionCache.invalidateAll();
}
void CatalogCache::report(BSONObjBuilder* builder) const {
BSONObjBuilder cacheStatsBuilder(builder->subobjStart("catalogCache"));
- size_t numDatabaseEntries;
- size_t numCollectionEntries{0};
- {
- numDatabaseEntries = _databaseCache.getCacheInfo().size();
- stdx::lock_guard<Latch> ul(_mutex);
- for (const auto& entry : _collectionsByDb) {
- numCollectionEntries += entry.second.size();
- }
- }
+ const size_t numDatabaseEntries = _databaseCache.getCacheInfo().size();
+ const size_t numCollectionEntries = _collectionCache.getCacheInfo().size();
cacheStatsBuilder.append("numDatabaseEntries", static_cast<long long>(numDatabaseEntries));
cacheStatsBuilder.append("numCollectionEntries", static_cast<long long>(numCollectionEntries));
_stats.report(&cacheStatsBuilder);
+ _collectionCache.reportStats(&cacheStatsBuilder);
}
void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx,
@@ -519,188 +384,8 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC
}
}
-void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
- ServiceContext* service,
- std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
- NamespaceString const& nss,
- int refreshAttempt) {
- const auto existingRoutingInfo = collEntry->routingInfo;
-
- // If we have an existing chunk manager, the refresh is considered "incremental", regardless of
- // how many chunks are in the differential
- const bool isIncremental(existingRoutingInfo);
-
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.addAndFetch(1);
- _stats.countIncrementalRefreshesStarted.addAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.addAndFetch(1);
- _stats.countFullRefreshesStarted.addAndFetch(1);
- }
-
- // Invoked when one iteration of getChunksSince has completed, whether with success or error
- const auto onRefreshCompleted = [this, t = Timer(), nss, isIncremental, existingRoutingInfo](
- const Status& status,
- RoutingTableHistory* routingInfoAfterRefresh) {
- if (isIncremental) {
- _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
- } else {
- _stats.numActiveFullRefreshes.subtractAndFetch(1);
- }
-
- if (!status.isOK()) {
- _stats.countFailedRefreshes.addAndFetch(1);
-
- LOGV2_OPTIONS(24103,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Error refreshing cached collection {namespace}; Took {duration} and "
- "failed due to {error}",
- "Error refreshing cached collection",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()),
- "error"_attr = redact(status));
- } else if (routingInfoAfterRefresh) {
- const int logLevel =
- (!existingRoutingInfo ||
- (existingRoutingInfo &&
- routingInfoAfterRefresh->getVersion() != existingRoutingInfo->getVersion()))
- ? 0
- : 1;
- LOGV2_FOR_CATALOG_REFRESH(
- 24104,
- logLevel,
- "Refreshed cached collection {namespace} to version {newVersion} from version "
- "{oldVersion}. Took {duration}",
- "Refreshed cached collection",
- "namespace"_attr = nss,
- "newVersion"_attr = routingInfoAfterRefresh->getVersion(),
- "oldVersion"_attr =
- (existingRoutingInfo
- ? (" from version " + existingRoutingInfo->getVersion().toString())
- : ""),
- "duration"_attr = Milliseconds(t.millis()));
- } else {
- LOGV2_OPTIONS(24105,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Collection {namespace} was found to be unsharded after refresh that "
- "took {duration}",
- "Collection has found to be unsharded after refresh",
- "namespace"_attr = nss,
- "duration"_attr = Milliseconds(t.millis()));
- }
- };
-
- // Invoked if getChunksSince resulted in error or threw an exception
- const auto onRefreshFailed =
- [ this, service, collEntry, nss, refreshAttempt,
- onRefreshCompleted ](WithLock lk, const Status& status) noexcept {
- onRefreshCompleted(status, nullptr);
-
- // It is possible that the metadata is being changed concurrently, so retry the
- // refresh again
- if (status == ErrorCodes::ConflictingOperationInProgress &&
- refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
- _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1);
- } else {
- // Leave needsRefresh to true so that any subsequent get attempts will kick off
- // another round of refresh
- collEntry->refreshCompletionNotification->set(status);
- collEntry->refreshCompletionNotification = nullptr;
- }
- };
-
- const auto refreshCallback =
- [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
-
- ThreadClient tc("CatalogCache::collectionRefresh", service);
- auto opCtx = tc->makeOperationContext();
-
- std::shared_ptr<RoutingTableHistory> newRoutingInfo;
- try {
- newRoutingInfo = refreshCollectionRoutingInfo(
- opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
-
- onRefreshCompleted(Status::OK(), newRoutingInfo.get());
- } catch (const DBException& ex) {
- stdx::lock_guard<Latch> lg(_mutex);
- onRefreshFailed(lg, ex.toStatus());
- return;
- }
-
- stdx::lock_guard<Latch> lg(_mutex);
-
- collEntry->epochHasChanged = false;
- collEntry->needsRefresh = false;
- collEntry->refreshCompletionNotification->set(Status::OK());
- collEntry->refreshCompletionNotification = nullptr;
-
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false);
-
- // TODO(SERVER-49876): remove clang-tidy NOLINT comments.
- if (existingRoutingInfo && newRoutingInfo && // NOLINT(bugprone-use-after-move)
- existingRoutingInfo->getVersion() == // NOLINT(bugprone-use-after-move)
- newRoutingInfo->getVersion()) { // NOLINT(bugprone-use-after-move)
- // If the routingInfo hasn't changed, we need to manually reset stale shards.
- newRoutingInfo->setAllShardsRefreshed();
- }
-
- collEntry->routingInfo = std::move(newRoutingInfo);
- };
-
- const ChunkVersion startingCollectionVersion =
- (existingRoutingInfo ? existingRoutingInfo->getVersion() : ChunkVersion::UNSHARDED());
-
- LOGV2_FOR_CATALOG_REFRESH(
- 24106,
- 1,
- "Refreshing cached collection {namespace} with version {currentCollectionVersion}",
- "namespace"_attr = nss,
- "currentCollectionVersion"_attr = startingCollectionVersion);
-
- _cacheLoader.getChunksSince(nss, startingCollectionVersion)
- .thenRunOn(_executor)
- .getAsync(refreshCallback);
-
- // The routing info for this collection shouldn't change, as other threads may try to use the
- // CatalogCache while we are waiting for the refresh to complete.
- invariant(collEntry->routingInfo.get() == existingRoutingInfo.get());
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkEpochStale(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- collRoutingInfoEntry->epochHasChanged = true;
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkShardStale(const NamespaceString& nss,
- const ShardId& staleShardId) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
- if (collRoutingInfoEntry->routingInfo) {
- collRoutingInfoEntry->routingInfo->setShardStale(staleShardId);
- }
-}
-
-void CatalogCache::_createOrGetCollectionEntryAndMarkAsNeedsRefresh(const NamespaceString& nss) {
- stdx::lock_guard<Latch> lg(_mutex);
- auto collRoutingInfoEntry = _createOrGetCollectionEntry(lg, nss);
- collRoutingInfoEntry->needsRefresh = true;
-}
-
-std::shared_ptr<CatalogCache::CollectionRoutingInfoEntry> CatalogCache::_createOrGetCollectionEntry(
- WithLock wl, const NamespaceString& nss) {
- auto& collectionsForDb = _collectionsByDb[nss.db()];
- if (!collectionsForDb.contains(nss.ns())) {
- // TODO SERVER-46199: ensure collections cache size is capped
- // currently no routine except for dropDatabase is removing cached collection entries and
- // the cache for a specific DB can grow indefinitely.
- collectionsForDb[nss.ns()] = std::make_shared<CollectionRoutingInfoEntry>();
- }
-
- return collectionsForDb[nss.ns()];
+void CatalogCache::invalidateCollectionEntry_LINEARIZABLE(const NamespaceString& nss) {
+ _collectionCache.invalidate(nss);
}
void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
@@ -708,14 +393,6 @@ void CatalogCache::Stats::report(BSONObjBuilder* builder) const {
builder->append("totalRefreshWaitTimeMicros", totalRefreshWaitTimeMicros.load());
- builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
- builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-
- builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
- builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-
- builder->append("countFailedRefreshes", countFailedRefreshes.load());
-
if (isMongos()) {
BSONObjBuilder operationsBlockedByRefreshBuilder(
builder->subobjStart("operationsBlockedByRefresh"));
@@ -756,7 +433,6 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
OperationContext* opCtx,
const std::string& dbName,
const ComparableDatabaseVersion& previousDbVersion) {
-
// TODO (SERVER-34164): Track and increment stats for database refreshes
LOGV2_FOR_CATALOG_REFRESH(24102, 2, "Refreshing cached database entry", "db"_attr = dbName);
@@ -788,73 +464,199 @@ CatalogCache::DatabaseCache::LookupResult CatalogCache::DatabaseCache::_lookupDa
}
}
-AtomicWord<uint64_t> ComparableDatabaseVersion::_localSequenceNumSource{1ULL};
+CatalogCache::CollectionCache::CollectionCache(ServiceContext* service,
+ ThreadPoolInterface& threadPool,
+ CatalogCacheLoader& catalogCacheLoader)
+ : ReadThroughCache(_mutex,
+ service,
+ threadPool,
+ [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ValueHandle& collectionHistory,
+ const ComparableChunkVersion& previousChunkVersion) {
+ return _lookupCollection(
+ opCtx, nss, collectionHistory, previousChunkVersion);
+ },
+ kCollectionCacheSize),
+ _catalogCacheLoader(catalogCacheLoader) {}
-ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
- const DatabaseVersion& version) {
- return ComparableDatabaseVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+void CatalogCache::CollectionCache::reportStats(BSONObjBuilder* builder) const {
+ _stats.report(builder);
}
-const DatabaseVersion& ComparableDatabaseVersion::getVersion() const {
- return _dbVersion;
+void CatalogCache::CollectionCache::_updateRefreshesStats(const bool isIncremental,
+ const bool add) {
+ if (add) {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.addAndFetch(1);
+ _stats.countIncrementalRefreshesStarted.addAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.addAndFetch(1);
+ _stats.countFullRefreshesStarted.addAndFetch(1);
+ }
+ } else {
+ if (isIncremental) {
+ _stats.numActiveIncrementalRefreshes.subtractAndFetch(1);
+ } else {
+ _stats.numActiveFullRefreshes.subtractAndFetch(1);
+ }
+ }
}
-uint64_t ComparableDatabaseVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
-}
+void CatalogCache::CollectionCache::Stats::report(BSONObjBuilder* builder) const {
+ builder->append("numActiveIncrementalRefreshes", numActiveIncrementalRefreshes.load());
+ builder->append("countIncrementalRefreshesStarted", countIncrementalRefreshesStarted.load());
-BSONObj ComparableDatabaseVersion::toBSON() const {
- BSONObjBuilder builder;
- _dbVersion.getUuid().appendToBuilder(&builder, "uuid");
- builder.append("lastMod", _dbVersion.getLastMod());
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
-}
+ builder->append("numActiveFullRefreshes", numActiveFullRefreshes.load());
+ builder->append("countFullRefreshesStarted", countFullRefreshesStarted.load());
-std::string ComparableDatabaseVersion::toString() const {
- return toBSON().toString();
+ builder->append("countFailedRefreshes", countFailedRefreshes.load());
}
+CatalogCache::CollectionCache::LookupResult CatalogCache::CollectionCache::_lookupCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const RoutingTableHistoryValueHandle& existingHistory,
+ const ComparableChunkVersion& previousVersion) {
+ const bool isIncremental(existingHistory && existingHistory->optRt);
+ _updateRefreshesStats(isIncremental, true);
-CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
- : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+ Timer t{};
+ try {
+ auto lookupVersion =
+ isIncremental ? existingHistory->optRt->getVersion() : ChunkVersion::UNSHARDED();
-const ShardId& CachedDatabaseInfo::primaryId() const {
- return _dbt.getPrimary();
+ LOGV2_FOR_CATALOG_REFRESH(4619900,
+ 1,
+ "Refreshing cached collection",
+ "namespace"_attr = nss,
+ "currentVersion"_attr = previousVersion);
+
+ auto collectionAndChunks = _catalogCacheLoader.getChunksSince(nss, lookupVersion).get();
+
+ auto newRoutingHistory = [&] {
+ // If we have routing info already and it's for the same collection epoch, we're
+ // updating. Otherwise, we're making a whole new routing table.
+ if (isIncremental &&
+ existingHistory->optRt->getVersion().epoch() == collectionAndChunks.epoch) {
+ return existingHistory->optRt->makeUpdated(collectionAndChunks.reshardingFields,
+ collectionAndChunks.changedChunks);
+ }
+
+ auto defaultCollator = [&]() -> std::unique_ptr<CollatorInterface> {
+ if (!collectionAndChunks.defaultCollation.isEmpty()) {
+ // The collation should have been validated upon collection creation
+ return uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collectionAndChunks.defaultCollation));
+ }
+ return nullptr;
+ }();
+
+ return RoutingTableHistory::makeNew(nss,
+ collectionAndChunks.uuid,
+ KeyPattern(collectionAndChunks.shardKeyPattern),
+ std::move(defaultCollator),
+ collectionAndChunks.shardKeyIsUnique,
+ collectionAndChunks.epoch,
+ std::move(collectionAndChunks.reshardingFields),
+ collectionAndChunks.changedChunks);
+ }();
+
+ newRoutingHistory.setAllShardsRefreshed();
+
+ // Check that the shards all match with what is on the config server
+ std::set<ShardId> shardIds;
+ newRoutingHistory.getAllShardIds(&shardIds);
+ for (const auto& shardId : shardIds) {
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
+ }
+
+ const auto newVersion =
+ ComparableChunkVersion::makeComparableChunkVersion(newRoutingHistory.getVersion());
+
+ LOGV2_FOR_CATALOG_REFRESH(4619901,
+ isIncremental || newVersion != previousVersion ? 0 : 1,
+ "Refreshed cached collection",
+ "namespace"_attr = nss,
+ "newVersion"_attr = newVersion,
+ "oldVersion"_attr = previousVersion,
+ "duration"_attr = Milliseconds(t.millis()));
+ _updateRefreshesStats(isIncremental, false);
+
+ return LookupResult(OptionalRoutingTableHistory(std::move(newRoutingHistory)), newVersion);
+ } catch (const DBException& ex) {
+ _stats.countFailedRefreshes.addAndFetch(1);
+ _updateRefreshesStats(isIncremental, false);
+
+ if (ex.code() == ErrorCodes::NamespaceNotFound) {
+ LOGV2_FOR_CATALOG_REFRESH(4619902,
+ 0,
+ "Collection has found to be unsharded after refresh",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()));
+
+ return LookupResult(
+ OptionalRoutingTableHistory(),
+ ComparableChunkVersion::makeComparableChunkVersion(ChunkVersion::UNSHARDED()));
+ }
+
+ LOGV2_FOR_CATALOG_REFRESH(4619903,
+ 0,
+ "Error refreshing cached collection",
+ "namespace"_attr = nss,
+ "duration"_attr = Milliseconds(t.millis()),
+ "error"_attr = redact(ex));
+
+ throw;
+ }
}
-bool CachedDatabaseInfo::shardingEnabled() const {
- return _dbt.getSharded();
+AtomicWord<uint64_t> ComparableDatabaseVersion::_uuidDisambiguatingSequenceNumSource{1ULL};
+
+ComparableDatabaseVersion ComparableDatabaseVersion::makeComparableDatabaseVersion(
+ const DatabaseVersion& version) {
+ return ComparableDatabaseVersion(version, _uuidDisambiguatingSequenceNumSource.fetchAndAdd(1));
}
-DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
- return _dbt.getVersion();
+std::string ComparableDatabaseVersion::toString() const {
+ return str::stream() << (_dbVersion ? _dbVersion->toBSON().toString() : "NONE") << "|"
+ << _uuidDisambiguatingSequenceNum;
}
-AtomicWord<uint64_t> ComparableChunkVersion::_localSequenceNumSource{1ULL};
+bool ComparableDatabaseVersion::operator==(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return true; // Default constructed value
+ if (_dbVersion.is_initialized() != other._dbVersion.is_initialized())
+ return false; // One side is default constructed value
-ComparableChunkVersion ComparableChunkVersion::makeComparableChunkVersion(
- const ChunkVersion& version) {
- return ComparableChunkVersion(version, _localSequenceNumSource.fetchAndAdd(1));
+ return sameUuid(other) && (_dbVersion->getLastMod() == other._dbVersion->getLastMod());
}
-const ChunkVersion& ComparableChunkVersion::getVersion() const {
- return _chunkVersion;
+bool ComparableDatabaseVersion::operator<(const ComparableDatabaseVersion& other) const {
+ if (!_dbVersion && !other._dbVersion)
+ return false; // Default constructed value
+
+ if (_dbVersion && other._dbVersion && sameUuid(other)) {
+ return _dbVersion->getLastMod() < other._dbVersion->getLastMod();
+ } else {
+ return _uuidDisambiguatingSequenceNum < other._uuidDisambiguatingSequenceNum;
+ }
}
-uint64_t ComparableChunkVersion::getLocalSequenceNum() const {
- return _localSequenceNum;
+CachedDatabaseInfo::CachedDatabaseInfo(DatabaseType dbt, std::shared_ptr<Shard> primaryShard)
+ : _dbt(std::move(dbt)), _primaryShard(std::move(primaryShard)) {}
+
+const ShardId& CachedDatabaseInfo::primaryId() const {
+ return _dbt.getPrimary();
}
-BSONObj ComparableChunkVersion::toBSON() const {
- BSONObjBuilder builder;
- _chunkVersion.appendToCommand(&builder);
- builder.append("localSequenceNum", std::to_string(_localSequenceNum));
- return builder.obj();
+bool CachedDatabaseInfo::shardingEnabled() const {
+ return _dbt.getSharded();
}
-std::string ComparableChunkVersion::toString() const {
- return toBSON().toString();
+DatabaseVersion CachedDatabaseInfo::databaseVersion() const {
+ return _dbt.getVersion();
}
} // namespace mongo