diff options
Diffstat (limited to 'src/mongo/s/catalog_cache.cpp')
-rw-r--r-- | src/mongo/s/catalog_cache.cpp | 120 |
1 files changed, 94 insertions, 26 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 3980317ea00..3d2825198b6 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -122,34 +122,40 @@ CatalogCache::~CatalogCache() = default; StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx, StringData dbName) { try { - stdx::lock_guard<stdx::mutex> lg(_mutex); + while (true) { + stdx::unique_lock<stdx::mutex> ul(_mutex); - auto& dbEntry = _databases[dbName]; - if (!dbEntry) { - dbEntry = std::make_shared<DatabaseInfoEntry>(); - } + auto& dbEntry = _databases[dbName]; + if (!dbEntry) { + dbEntry = std::make_shared<DatabaseInfoEntry>(); + } - if (dbEntry->needsRefresh) { - const auto catalogClient = Grid::get(opCtx)->catalogClient(); + if (dbEntry->needsRefresh) { + auto refreshNotification = dbEntry->refreshCompletionNotification; + if (!refreshNotification) { + refreshNotification = (dbEntry->refreshCompletionNotification = + std::make_shared<Notification<Status>>()); + _scheduleDatabaseRefresh(ul, dbName, dbEntry); + } - const auto dbNameCopy = dbName.toString(); + // Wait on the notification outside of the mutex. + ul.unlock(); + uassertStatusOK(refreshNotification->get(opCtx)); - // Load the database entry - const auto opTimeWithDb = uassertStatusOK(catalogClient->getDatabase( - opCtx, dbNameCopy, repl::ReadConcernLevel::kMajorityReadConcern)); - const auto& dbDesc = opTimeWithDb.value; + // Once the refresh is complete, loop around to get the refreshed cache entry. + continue; + } - if (!dbEntry->dbt) { + if (dbEntry->mustLoadShardedCollections) { // If this is the first time we are loading info for this database, also load the // sharded collections. - // TODO (SERVER-34061): Stop loading sharded collections here. - repl::OpTime collLoadConfigOptime; - const std::vector<CollectionType> collections = uassertStatusOK( - catalogClient->getCollections(opCtx, &dbNameCopy, &collLoadConfigOptime)); + // TODO (SERVER-34061): Stop loading sharded collections when loading a database. - const auto refreshCallbackFn = [](OperationContext* opCtx, - StatusWith<DatabaseType>) {}; - _cacheLoader.getDatabase(dbName, refreshCallbackFn); + const auto dbNameCopy = dbName.toString(); + repl::OpTime collLoadConfigOptime; + const std::vector<CollectionType> collections = + uassertStatusOK(Grid::get(opCtx)->catalogClient()->getCollections( + opCtx, &dbNameCopy, &collLoadConfigOptime)); CollectionInfoMap collectionEntries; for (const auto& coll : collections) { @@ -160,15 +166,13 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx std::make_shared<CollectionRoutingInfoEntry>(); } _collectionsByDb[dbName] = std::move(collectionEntries); + dbEntry->mustLoadShardedCollections = false; } - dbEntry->needsRefresh = false; - dbEntry->dbt = std::move(dbDesc); + auto primaryShard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->dbt->getPrimary())); + return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard))}; } - - auto primaryShard = uassertStatusOK( - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->dbt->getPrimary())); - return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard))}; } catch (const DBException& ex) { return ex.toStatus(); } @@ -400,6 +404,70 @@ void CatalogCache::report(BSONObjBuilder* builder) const { _stats.report(&cacheStatsBuilder); } +void CatalogCache::_scheduleDatabaseRefresh(WithLock, + const StringData dbName, + std::shared_ptr<DatabaseInfoEntry> dbEntry) { + + log() << "Refreshing cached database entry for " << dbName + << "; current cached database info is " + << (dbEntry->dbt ? dbEntry->dbt->toBSON() : BSONObj()); + + const auto onRefreshCompleted = + [ this, t = Timer(), dbName ](const StatusWith<DatabaseType>& swDbt) { + // TODO (SERVER-34164): Track and increment stats for database refreshes. + if (!swDbt.isOK()) { + log() << "Refresh for database " << dbName << " took " << t.millis() << " ms and failed" + << causedBy(redact(swDbt.getStatus())); + return; + } + log() << "Refresh for database " << dbName << " took " << t.millis() << " ms and found " + << swDbt.getValue().toBSON(); + }; + + const auto onRefreshFailed = + [ this, dbName, dbEntry ](WithLock lk, const Status& status) noexcept { + // Clear the notification so the next 'getDatabase' kicks off a new refresh attempt. + dbEntry->refreshCompletionNotification->set(status); + dbEntry->refreshCompletionNotification = nullptr; + + if (status == ErrorCodes::NamespaceNotFound) { + // The refresh found that the database was dropped, so remove its entry from the cache. + _databases.erase(dbName); + _collectionsByDb.erase(dbName); + return; + } + }; + + const auto onRefreshSucceeded = [this, dbName, dbEntry](WithLock lk, DatabaseType dbt) { + // Update the cached entry with the refreshed metadata and mark the entry as fresh. + dbEntry->dbt = std::move(dbt); + dbEntry->needsRefresh = false; + dbEntry->refreshCompletionNotification->set(Status::OK()); + dbEntry->refreshCompletionNotification = nullptr; + }; + + const auto updateCatalogCacheFn = + [ this, dbName, dbEntry, onRefreshFailed, onRefreshSucceeded, onRefreshCompleted ]( + OperationContext * opCtx, StatusWith<DatabaseType> swDbt) noexcept { + onRefreshCompleted(swDbt); + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (!swDbt.isOK()) { + onRefreshFailed(lg, swDbt.getStatus()); + return; + } + onRefreshSucceeded(lg, std::move(swDbt.getValue())); + }; + + try { + _cacheLoader.getDatabase(dbName, updateCatalogCacheFn); + } catch (const DBException& ex) { + const auto status = ex.toStatus(); + stdx::lock_guard<stdx::mutex> lg(_mutex); + onRefreshCompleted(status); + onRefreshFailed(lg, status); + } +} + void CatalogCache::_scheduleCollectionRefresh(WithLock lk, std::shared_ptr<CollectionRoutingInfoEntry> collEntry, NamespaceString const& nss, |