summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog_cache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/catalog_cache.cpp')
-rw-r--r--src/mongo/s/catalog_cache.cpp120
1 files changed, 94 insertions, 26 deletions
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 3980317ea00..3d2825198b6 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -122,34 +122,40 @@ CatalogCache::~CatalogCache() = default;
StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx,
StringData dbName) {
try {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+ while (true) {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
- auto& dbEntry = _databases[dbName];
- if (!dbEntry) {
- dbEntry = std::make_shared<DatabaseInfoEntry>();
- }
+ auto& dbEntry = _databases[dbName];
+ if (!dbEntry) {
+ dbEntry = std::make_shared<DatabaseInfoEntry>();
+ }
- if (dbEntry->needsRefresh) {
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ if (dbEntry->needsRefresh) {
+ auto refreshNotification = dbEntry->refreshCompletionNotification;
+ if (!refreshNotification) {
+ refreshNotification = (dbEntry->refreshCompletionNotification =
+ std::make_shared<Notification<Status>>());
+ _scheduleDatabaseRefresh(ul, dbName, dbEntry);
+ }
- const auto dbNameCopy = dbName.toString();
+ // Wait on the notification outside of the mutex.
+ ul.unlock();
+ uassertStatusOK(refreshNotification->get(opCtx));
- // Load the database entry
- const auto opTimeWithDb = uassertStatusOK(catalogClient->getDatabase(
- opCtx, dbNameCopy, repl::ReadConcernLevel::kMajorityReadConcern));
- const auto& dbDesc = opTimeWithDb.value;
+ // Once the refresh is complete, loop around to get the refreshed cache entry.
+ continue;
+ }
- if (!dbEntry->dbt) {
+ if (dbEntry->mustLoadShardedCollections) {
// If this is the first time we are loading info for this database, also load the
// sharded collections.
- // TODO (SERVER-34061): Stop loading sharded collections here.
- repl::OpTime collLoadConfigOptime;
- const std::vector<CollectionType> collections = uassertStatusOK(
- catalogClient->getCollections(opCtx, &dbNameCopy, &collLoadConfigOptime));
+ // TODO (SERVER-34061): Stop loading sharded collections when loading a database.
- const auto refreshCallbackFn = [](OperationContext* opCtx,
- StatusWith<DatabaseType>) {};
- _cacheLoader.getDatabase(dbName, refreshCallbackFn);
+ const auto dbNameCopy = dbName.toString();
+ repl::OpTime collLoadConfigOptime;
+ const std::vector<CollectionType> collections =
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->getCollections(
+ opCtx, &dbNameCopy, &collLoadConfigOptime));
CollectionInfoMap collectionEntries;
for (const auto& coll : collections) {
@@ -160,15 +166,13 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
std::make_shared<CollectionRoutingInfoEntry>();
}
_collectionsByDb[dbName] = std::move(collectionEntries);
+ dbEntry->mustLoadShardedCollections = false;
}
- dbEntry->needsRefresh = false;
- dbEntry->dbt = std::move(dbDesc);
+ auto primaryShard = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->dbt->getPrimary()));
+ return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard))};
}
-
- auto primaryShard = uassertStatusOK(
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbEntry->dbt->getPrimary()));
- return {CachedDatabaseInfo(*dbEntry->dbt, std::move(primaryShard))};
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -400,6 +404,70 @@ void CatalogCache::report(BSONObjBuilder* builder) const {
_stats.report(&cacheStatsBuilder);
}
+void CatalogCache::_scheduleDatabaseRefresh(WithLock,
+ const StringData dbName,
+ std::shared_ptr<DatabaseInfoEntry> dbEntry) {
+
+ log() << "Refreshing cached database entry for " << dbName
+ << "; current cached database info is "
+ << (dbEntry->dbt ? dbEntry->dbt->toBSON() : BSONObj());
+
+ const auto onRefreshCompleted =
+ [ this, t = Timer(), dbName ](const StatusWith<DatabaseType>& swDbt) {
+ // TODO (SERVER-34164): Track and increment stats for database refreshes.
+ if (!swDbt.isOK()) {
+ log() << "Refresh for database " << dbName << " took " << t.millis() << " ms and failed"
+ << causedBy(redact(swDbt.getStatus()));
+ return;
+ }
+ log() << "Refresh for database " << dbName << " took " << t.millis() << " ms and found "
+ << swDbt.getValue().toBSON();
+ };
+
+ const auto onRefreshFailed =
+ [ this, dbName, dbEntry ](WithLock lk, const Status& status) noexcept {
+ // Clear the notification so the next 'getDatabase' kicks off a new refresh attempt.
+ dbEntry->refreshCompletionNotification->set(status);
+ dbEntry->refreshCompletionNotification = nullptr;
+
+ if (status == ErrorCodes::NamespaceNotFound) {
+ // The refresh found that the database was dropped, so remove its entry from the cache.
+ _databases.erase(dbName);
+ _collectionsByDb.erase(dbName);
+ return;
+ }
+ };
+
+ const auto onRefreshSucceeded = [this, dbName, dbEntry](WithLock lk, DatabaseType dbt) {
+ // Update the cached entry with the refreshed metadata and mark the entry as fresh.
+ dbEntry->dbt = std::move(dbt);
+ dbEntry->needsRefresh = false;
+ dbEntry->refreshCompletionNotification->set(Status::OK());
+ dbEntry->refreshCompletionNotification = nullptr;
+ };
+
+ const auto updateCatalogCacheFn =
+ [ this, dbName, dbEntry, onRefreshFailed, onRefreshSucceeded, onRefreshCompleted ](
+ OperationContext * opCtx, StatusWith<DatabaseType> swDbt) noexcept {
+ onRefreshCompleted(swDbt);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (!swDbt.isOK()) {
+ onRefreshFailed(lg, swDbt.getStatus());
+ return;
+ }
+ onRefreshSucceeded(lg, std::move(swDbt.getValue()));
+ };
+
+ try {
+ _cacheLoader.getDatabase(dbName, updateCatalogCacheFn);
+ } catch (const DBException& ex) {
+ const auto status = ex.toStatus();
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ onRefreshCompleted(status);
+ onRefreshFailed(lg, status);
+ }
+}
+
void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
NamespaceString const& nss,