diff options
Diffstat (limited to 'src/mongo/db/s/shard_server_catalog_cache_loader.cpp')
-rw-r--r-- | src/mongo/db/s/shard_server_catalog_cache_loader.cpp | 229 |
1 files changed, 105 insertions, 124 deletions
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 70b0c445ec3..36700332edb 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -616,99 +616,94 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( return getPersistedMaxChunkVersion(opCtx, nss); }(); - auto remoteRefreshCallbackFn = [this, - nss, - catalogCacheSinceVersion, - maxLoaderVersion, - termScheduled, - callbackFn, - notify]( + auto remoteRefreshFn = [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled]( OperationContext* opCtx, - StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) { + StatusWith<CollectionAndChangedChunks> + swCollectionAndChangedChunks) -> StatusWith<CollectionAndChangedChunks> { if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { - Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask( + _ensureMajorityPrimaryAndScheduleCollAndChunksTask( opCtx, nss, collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); - if (!scheduleStatus.isOK()) { - callbackFn(opCtx, scheduleStatus); - notify->set(); - return; - } LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for collection " << nss << " from version " << maxLoaderVersion << " and no metadata was found."; - } else if (swCollectionAndChangedChunks.isOK()) { - auto& collAndChunks = swCollectionAndChangedChunks.getValue(); - - if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) { - swCollectionAndChangedChunks = - Status{ErrorCodes::ConflictingOperationInProgress, - str::stream() - << "Invalid chunks found when reloading '" - << nss.toString() - << "' Previous collection epoch was '" - << collAndChunks.epoch.toString() - << "', but found a new epoch '" - << collAndChunks.changedChunks.back().getVersion().epoch().toString() - << "'. Collection was dropped and recreated."}; - } else { - if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || - (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) { - Status scheduleStatus = _ensureMajorityPrimaryAndScheduleCollAndChunksTask( - opCtx, - nss, - collAndChunkTask{ - swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); - if (!scheduleStatus.isOK()) { - callbackFn(opCtx, scheduleStatus); - notify->set(); - return; - } - } + return swCollectionAndChangedChunks; + } + + if (!swCollectionAndChangedChunks.isOK()) { + return swCollectionAndChangedChunks; + } + + auto& collAndChunks = swCollectionAndChangedChunks.getValue(); + + if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) { + return Status{ + ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Invalid chunks found when reloading '" << nss.toString() + << "' Previous collection epoch was '" + << collAndChunks.epoch.toString() + << "', but found a new epoch '" + << collAndChunks.changedChunks.back().getVersion().epoch().toString() + << "'. Collection was dropped and recreated."}; + } + + if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || + (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) { + _ensureMajorityPrimaryAndScheduleCollAndChunksTask( + opCtx, + nss, + collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + } - LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for collection " << nss - << " from collection version " << maxLoaderVersion - << " and found collection version " - << collAndChunks.changedChunks.back().getVersion(); + LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for collection " << nss + << " from collection version " << maxLoaderVersion + << " and found collection version " + << collAndChunks.changedChunks.back().getVersion(); - // Metadata was found remotely -- otherwise would have received NamespaceNotFound - // rather than Status::OK(). Return metadata for CatalogCache that's GTE - // catalogCacheSinceVersion, from the loader's persisted and enqueued metadata. + // Metadata was found remotely + // -- otherwise we would have received NamespaceNotFound rather than Status::OK(). + // Return metadata for CatalogCache that's GTE catalogCacheSinceVersion, + // from the loader's persisted and enqueued metadata. - swCollectionAndChangedChunks = - _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); + swCollectionAndChangedChunks = + _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); + if (!swCollectionAndChangedChunks.isOK()) { + return swCollectionAndChangedChunks; + } - const auto termAfterRefresh = [&] { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _term; - }(); - - if (termAfterRefresh != termScheduled) { - // Raising a ConflictingOperationInProgress error here will cause the - // CatalogCache to attempt the refresh as secondary instead of failing the - // operation - swCollectionAndChangedChunks = Status( - ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Replication stepdown occurred during refresh for '" - << nss.toString()); - } else if (swCollectionAndChangedChunks.isOK()) { - // After finding metadata remotely, we must have found metadata locally. - invariant(!collAndChunks.changedChunks.empty()); - } - } + const auto termAfterRefresh = [&] { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _term; + }(); + + if (termAfterRefresh != termScheduled) { + // Raising a ConflictingOperationInProgress error here will cause the + // CatalogCache to attempt the refresh as secondary instead of failing the + // operation + return Status(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Replication stepdown occurred during refresh for '" + << nss.toString()); } - // Complete the callbackFn work. - callbackFn(opCtx, std::move(swCollectionAndChangedChunks)); - notify->set(); + // After finding metadata remotely, we must have found metadata locally. + invariant(!collAndChunks.changedChunks.empty()); + + return swCollectionAndChangedChunks; }; // Refresh the loader's metadata from the config server. The caller's request will // then be serviced from the loader's up-to-date metadata. - _configServerLoader->getChunksSince(nss, maxLoaderVersion, remoteRefreshCallbackFn); + _configServerLoader->getChunksSince( + nss, + maxLoaderVersion, + [this, remoteRefreshFn, callbackFn, notify](auto opCtx, auto swCollectionAndChangedChunks) { + // Complete the callbackFn work. + callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swCollectionAndChangedChunks))); + notify->set(); + }); } void ShardServerCatalogCacheLoader::_runSecondaryGetDatabase( @@ -728,36 +723,33 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase( StringData dbName, long long termScheduled, stdx::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - auto remoteRefreshCallbackFn = [ this, name = dbName.toString(), termScheduled, callbackFn ]( + auto remoteRefreshFn = [ this, name = dbName.toString(), termScheduled ]( OperationContext * opCtx, StatusWith<DatabaseType> swDatabaseType) { if (swDatabaseType == ErrorCodes::NamespaceNotFound) { - Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask( + _ensureMajorityPrimaryAndScheduleDbTask( opCtx, name, DBTask{swDatabaseType, termScheduled}); - if (!scheduleStatus.isOK()) { - callbackFn(opCtx, scheduleStatus); - return; - } LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for database " << name << " and found the database has been dropped."; + return swDatabaseType; + } - } else if (swDatabaseType.isOK()) { - Status scheduleStatus = _ensureMajorityPrimaryAndScheduleDbTask( - opCtx, name, DBTask{swDatabaseType, termScheduled}); - if (!scheduleStatus.isOK()) { - callbackFn(opCtx, scheduleStatus); - return; - } - - LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for database " << name - << " and found " << swDatabaseType.getValue().toBSON(); + if (!swDatabaseType.isOK()) { + return swDatabaseType; } - // Complete the callbackFn work. - callbackFn(opCtx, std::move(swDatabaseType)); + _ensureMajorityPrimaryAndScheduleDbTask(opCtx, name, DBTask{swDatabaseType, termScheduled}); + + LOG_CATALOG_REFRESH(1) << "Cache loader remotely refreshed for database " << name + << " and found " << swDatabaseType.getValue().toBSON(); + + return swDatabaseType; }; - _configServerLoader->getDatabase(dbName, remoteRefreshCallbackFn); + _configServerLoader->getDatabase( + dbName, [this, remoteRefreshFn, callbackFn](auto opCtx, auto swDatabaseType) { + callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swDatabaseType))); + }); } StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata( @@ -871,51 +863,40 @@ std::pair<bool, CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getE return std::make_pair(true, collAndChunks); } -Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask( +void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChunksTask( OperationContext* opCtx, const NamespaceString& nss, collAndChunkTask task) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - const bool wasEmpty = _collAndChunkTaskLists[nss].empty(); - _collAndChunkTaskLists[nss].addTask(std::move(task)); - if (!wasEmpty) { - return Status::OK(); - } + { + stdx::lock_guard<stdx::mutex> lock(_mutex); - Status status = _threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); }); - if (!status.isOK()) { - LOG(0) << "Cache loader failed to schedule persisted metadata update" - << " task for namespace '" << nss << "' due to '" << redact(status) - << "'. Clearing task list so that scheduling" - << " will be attempted by the next caller to refresh this namespace."; + auto& list = _collAndChunkTaskLists[nss]; + auto wasEmpty = list.empty(); + list.addTask(std::move(task)); - _collAndChunkTaskLists.erase(nss); + if (!wasEmpty) + return; } - return status; + invariant(_threadPool.schedule([this, nss]() { _runCollAndChunksTasks(nss); })); } -Status ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask( - OperationContext* opCtx, StringData dbName, DBTask task) { +void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(OperationContext* opCtx, + StringData dbName, + DBTask task) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - const bool wasEmpty = _dbTaskLists[dbName.toString()].empty(); - _dbTaskLists[dbName.toString()].addTask(std::move(task)); - if (!wasEmpty) { - return Status::OK(); - } + { + stdx::lock_guard<stdx::mutex> lock(_mutex); - Status status = - _threadPool.schedule([ this, name = dbName.toString() ]() { _runDbTasks(name); }); - if (!status.isOK()) { - LOG(0) << "Cache loader failed to schedule persisted metadata update" - << " task for db '" << dbName << "' due to '" << redact(status) - << "'. Clearing task list so that scheduling" - << " will be attempted by the next caller to refresh this namespace."; + auto& list = _dbTaskLists[dbName.toString()]; + auto wasEmpty = list.empty(); + list.addTask(std::move(task)); - _dbTaskLists.erase(dbName.toString()); + if (!wasEmpty) + return; } - return status; + auto name = dbName.toString(); + invariant(_threadPool.schedule([this, name]() { _runDbTasks(name); })); } void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) { |