diff options
22 files changed, 462 insertions, 774 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp index 9bd095b9f6b..ed80a5df4ac 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.cpp +++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp @@ -40,35 +40,8 @@ namespace mongo { using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks; -namespace { - -/** - * Constructs the options for the loader thread pool. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "CatalogCacheLoaderMock"; - options.minThreads = 0; - options.maxThreads = 1; - - // Ensure all threads have a client. - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - - return options; -} - -} // namespace - -CatalogCacheLoaderMock::CatalogCacheLoaderMock() : _threadPool(makeDefaultThreadPoolOptions()) { - _threadPool.startup(); -} - -CatalogCacheLoaderMock::~CatalogCacheLoaderMock() { - _threadPool.shutdown(); - _threadPool.join(); -} +CatalogCacheLoaderMock::CatalogCacheLoaderMock(std::shared_ptr<ThreadPool> executor) + : _executor(executor) {} void CatalogCacheLoaderMock::initializeReplicaSetRole(bool isPrimary) { MONGO_UNREACHABLE; @@ -97,68 +70,41 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin MONGO_UNREACHABLE; } -std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince( - const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { - auto notify = std::make_shared<Notification<void>>(); - - _threadPool.schedule([ this, notify, callbackFn ](auto status) noexcept { - invariant(status); - - auto opCtx = Client::getCurrent()->makeOperationContext(); - - auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { - try { - uassertStatusOK(_swCollectionReturnValue); - uassertStatusOK(_swChunksReturnValue); - - // We swap the chunks out of _swChunksReturnValue to ensure if this task is - // scheduled multiple times that we don't inform the ChunkManager about a chunk it - // has already updated. - std::vector<ChunkType> chunks; - _swChunksReturnValue.getValue().swap(chunks); - - return CollectionAndChangedChunks( - _swCollectionReturnValue.getValue().getUUID(), - _swCollectionReturnValue.getValue().getEpoch(), - _swCollectionReturnValue.getValue().getKeyPattern().toBSON(), - _swCollectionReturnValue.getValue().getDefaultCollation(), - _swCollectionReturnValue.getValue().getUnique(), - std::move(chunks)); - } catch (const DBException& ex) { - return ex.toStatus(); - } - }(); - - callbackFn(opCtx.get(), std::move(swCollAndChunks)); - notify->set(); - }); - - return notify; +SemiFuture<CollectionAndChangedChunks> CatalogCacheLoaderMock::getChunksSince( + const NamespaceString& nss, ChunkVersion version) { + + return ExecutorFuture<void>(_executor) + .then([this] { + uassertStatusOK(_swCollectionReturnValue); + uassertStatusOK(_swChunksReturnValue); + + // We swap the chunks out of _swChunksReturnValue to ensure if this task is + // scheduled multiple times that we don't inform the ChunkManager about a chunk it + // has already updated. + std::vector<ChunkType> chunks; + _swChunksReturnValue.getValue().swap(chunks); + + return CollectionAndChangedChunks( + _swCollectionReturnValue.getValue().getUUID(), + _swCollectionReturnValue.getValue().getEpoch(), + _swCollectionReturnValue.getValue().getKeyPattern().toBSON(), + _swCollectionReturnValue.getValue().getDefaultCollation(), + _swCollectionReturnValue.getValue().getUnique(), + std::move(chunks)); + }) + .semi(); } -void CatalogCacheLoaderMock::getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - _threadPool.schedule([ this, callbackFn ](auto status) noexcept { - invariant(status); - - auto opCtx = Client::getCurrent()->makeOperationContext(); - - auto swDatabase = [&]() -> StatusWith<DatabaseType> { - try { - uassertStatusOK(_swDatabaseReturnValue); - - return DatabaseType(_swDatabaseReturnValue.getValue().getName(), - _swDatabaseReturnValue.getValue().getPrimary(), - _swDatabaseReturnValue.getValue().getSharded(), - _swDatabaseReturnValue.getValue().getVersion()); - } catch (const DBException& ex) { - return ex.toStatus(); - } - }(); - - callbackFn(opCtx.get(), std::move(swDatabase)); - }); +SemiFuture<DatabaseType> CatalogCacheLoaderMock::getDatabase(StringData dbName) { + return ExecutorFuture<void>(_executor) + .then([this] { + uassertStatusOK(_swDatabaseReturnValue); + return DatabaseType(_swDatabaseReturnValue.getValue().getName(), + _swDatabaseReturnValue.getValue().getPrimary(), + _swDatabaseReturnValue.getValue().getSharded(), + _swDatabaseReturnValue.getValue().getVersion()); + }) + .semi(); } void CatalogCacheLoaderMock::setCollectionRefreshReturnValue( diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h index ce566a02fd7..dfc43a34867 100644 --- a/src/mongo/db/s/catalog_cache_loader_mock.h +++ b/src/mongo/db/s/catalog_cache_loader_mock.h @@ -43,8 +43,8 @@ class CatalogCacheLoaderMock final : public CatalogCacheLoader { CatalogCacheLoaderMock& operator=(const CatalogCacheLoaderMock&) = delete; public: - CatalogCacheLoaderMock(); - ~CatalogCacheLoaderMock(); + CatalogCacheLoaderMock(std::shared_ptr<ThreadPool> executor); + ~CatalogCacheLoaderMock() = default; /** * These functions should never be called. They trigger invariants if called. @@ -57,14 +57,10 @@ public: void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; void waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) override; - std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - GetChunksSinceCallbackFn callbackFn) override; + SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss, + ChunkVersion version) override; - void getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override; + SemiFuture<DatabaseType> getDatabase(StringData dbName) override; /** * Sets the mocked collection entry result that getChunksSince will use to construct its return @@ -96,7 +92,7 @@ private: Status(ErrorCodes::InternalError, "config loader mock chunks response is uninitialized")}; // Thread pool on which to mock load chunk metadata. - ThreadPool _threadPool; + std::shared_ptr<ThreadPool> _executor; }; } // namespace mongo diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp index de55abb2be4..1470e79bbe2 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.cpp +++ b/src/mongo/db/s/config/config_server_test_fixture.cpp @@ -142,9 +142,10 @@ void ConfigServerTestFixture::_setUp(std::function<void()> onPreInitGlobalStateF _addShardNetworkTestEnv = std::make_unique<NetworkTestEnv>(_executorForAddShard, _mockNetworkForAddShard); - - CatalogCacheLoader::set(getServiceContext(), - std::make_unique<ConfigServerCatalogCacheLoader>()); + _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); + CatalogCacheLoader::set( + getServiceContext(), + std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor())); onPreInitGlobalStateFn(); diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp index 8e8e6e3f52c..82ae3514a64 100644 --- a/src/mongo/db/s/migration_util_test.cpp +++ b/src/mongo/db/s/migration_util_test.cpp @@ -346,8 +346,8 @@ public: _clusterId = OID::gen(); ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId); - std::unique_ptr<CatalogCacheLoaderMock> mockLoader = - std::make_unique<CatalogCacheLoaderMock>(); + _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); + auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(catalogCacheExecutor()); _mockCatalogCacheLoader = mockLoader.get(); CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader)); diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp index b381c7e596d..7744a517048 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp +++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp @@ -52,15 +52,13 @@ void ReadOnlyCatalogCacheLoader::shutDown() { _configServerLoader.shutDown(); } -std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { - return _configServerLoader.getChunksSince(nss, version, callbackFn); +SemiFuture<CollectionAndChangedChunks> ReadOnlyCatalogCacheLoader::getChunksSince( + const NamespaceString& nss, ChunkVersion version) { + return _configServerLoader.getChunksSince(nss, version); } -void ReadOnlyCatalogCacheLoader::getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - return _configServerLoader.getDatabase(dbName, callbackFn); +SemiFuture<DatabaseType> ReadOnlyCatalogCacheLoader::getDatabase(StringData dbName) { + return _configServerLoader.getDatabase(dbName); } } // namespace mongo diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h index 83df1fd5114..e5f95f75faa 100644 --- a/src/mongo/db/s/read_only_catalog_cache_loader.h +++ b/src/mongo/db/s/read_only_catalog_cache_loader.h @@ -40,6 +40,8 @@ namespace mongo { */ class ReadOnlyCatalogCacheLoader final : public CatalogCacheLoader { public: + ReadOnlyCatalogCacheLoader(std::shared_ptr<ThreadPool> executor) + : _configServerLoader(executor){}; ~ReadOnlyCatalogCacheLoader(); void initializeReplicaSetRole(bool isPrimary) override {} @@ -50,14 +52,9 @@ public: void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; void waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) override; - std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - GetChunksSinceCallbackFn callbackFn) override; - - void getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override; + SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss, + ChunkVersion version) override; + SemiFuture<DatabaseType> getDatabase(StringData dbName) override; private: ConfigServerCatalogCacheLoader _configServerLoader; diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp index 6b880b788cc..5575361c770 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp @@ -66,23 +66,6 @@ MONGO_FAIL_POINT_DEFINE(hangPersistCollectionAndChangedChunksAfterDropChunks); AtomicWord<unsigned long long> taskIdGenerator{0}; -/** - * Constructs the options for the loader thread pool. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "ShardServerCatalogCacheLoader"; - options.minThreads = 0; - options.maxThreads = 6; - - // Ensure all threads have a client. - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - - return options; -} - void dropChunksIfEpochChanged(OperationContext* opCtx, const NamespaceString& nss, const CollectionAndChangedChunks& collAndChunks, @@ -369,11 +352,8 @@ ChunkVersion getLocalVersion(OperationContext* opCtx, const NamespaceString& nss } // namespace ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader( - std::unique_ptr<CatalogCacheLoader> configServerLoader) - : _configServerLoader(std::move(configServerLoader)), - _threadPool(makeDefaultThreadPoolOptions()) { - _threadPool.startup(); -} + std::unique_ptr<CatalogCacheLoader> configServerLoader, std::shared_ptr<ThreadPool> executor) + : _configServerLoader(std::move(configServerLoader)), _executor(executor) {} ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() { shutDown(); @@ -420,22 +400,21 @@ void ShardServerCatalogCacheLoader::shutDown() { } // Prevent further scheduling, then interrupt ongoing tasks. - _threadPool.shutdown(); + _executor->shutdown(); { stdx::lock_guard<Latch> lock(_mutex); _contexts.interrupt(ErrorCodes::InterruptedAtShutdown); ++_term; } - _threadPool.join(); + _executor->join(); invariant(_contexts.isEmpty()); _configServerLoader->shutDown(); } -std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { - auto notify = std::make_shared<Notification<void>>(); +SemiFuture<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::getChunksSince( + const NamespaceString& nss, ChunkVersion version) { bool isPrimary; long long term; @@ -444,57 +423,46 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); - _threadPool.schedule( - [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept { - invariant(status); - - auto context = _contexts.makeOperationContext(*Client::getCurrent()); - auto const opCtx = context.opCtx(); - - try { - { - // We may have missed an OperationContextGroup interrupt since this operation - // began but before the OperationContext was added to the group. So we'll check - // that we're still in the same _term. - stdx::lock_guard<Latch> lock(_mutex); - uassert(ErrorCodes::InterruptedDueToReplStateChange, - "Unable to refresh routing table because replica set state changed or " - "the node is shutting down.", - _term == term); - } - - if (isPrimary) { - _schedulePrimaryGetChunksSince(opCtx, nss, version, term, callbackFn, notify); - } else { - _runSecondaryGetChunksSince(opCtx, nss, version, callbackFn, notify); - } - } catch (const DBException& ex) { - callbackFn(opCtx, ex.toStatus()); - notify->set(); + return ExecutorFuture<void>(_executor) + .then([=]() { + ThreadClient tc("ShardServerCatalogCacheLoader::getChunksSince", + getGlobalServiceContext()); + auto context = _contexts.makeOperationContext(*tc); + { + // We may have missed an OperationContextGroup interrupt since this operation + // began but before the OperationContext was added to the group. So we'll check + // that we're still in the same _term. + stdx::lock_guard<Latch> lock(_mutex); + uassert(ErrorCodes::InterruptedDueToReplStateChange, + "Unable to refresh routing table because replica set state changed or " + "the node is shutting down.", + _term == term); } - }); - return notify; + if (isPrimary) { + return _schedulePrimaryGetChunksSince(context.opCtx(), nss, version, term); + } else { + return _runSecondaryGetChunksSince(context.opCtx(), nss, version); + } + }) + .semi(); } -void ShardServerCatalogCacheLoader::getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - bool isPrimary; - long long term; - std::tie(isPrimary, term) = [&] { +SemiFuture<DatabaseType> ShardServerCatalogCacheLoader::getDatabase(StringData dbName) { + const auto [isPrimary, term] = [&] { stdx::lock_guard<Latch> lock(_mutex); return std::make_tuple(_role == ReplicaSetRole::Primary, _term); }(); - _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary, - term ](auto status) noexcept { - invariant(status); - - auto context = _contexts.makeOperationContext(*Client::getCurrent()); - auto const opCtx = context.opCtx(); + return ExecutorFuture<void>(_executor) + .then([this, + dbName = std::move(dbName), + isPrimary = std::move(isPrimary), + term = std::move(term)]() { + ThreadClient tc("ShardServerCatalogCacheLoader::getDatabase", + getGlobalServiceContext()); + auto context = _contexts.makeOperationContext(*tc); - try { { // We may have missed an OperationContextGroup interrupt since this operation began // but before the OperationContext was added to the group. So we'll check that we're @@ -507,14 +475,12 @@ void ShardServerCatalogCacheLoader::getDatabase( } if (isPrimary) { - _schedulePrimaryGetDatabase(opCtx, name, term, callbackFn); + return _schedulePrimaryGetDatabase(context.opCtx(), dbName, term); } else { - _runSecondaryGetDatabase(opCtx, name, callbackFn); + return _runSecondaryGetDatabase(context.opCtx(), dbName); } - } catch (const DBException& ex) { - callbackFn(context.opCtx(), ex.toStatus()); - } - }); + }) + .semi(); } void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx, @@ -620,12 +586,11 @@ void ShardServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx } } -void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( +StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& catalogCacheSinceVersion, - std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, - std::shared_ptr<Notification<void>> notify) { + const ChunkVersion& catalogCacheSinceVersion) { + forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss); // Read the local metadata. @@ -635,19 +600,18 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince( // CollectionVersionLogOpHandler. BlockSecondaryReadsDuringBatchApplication_DONT_USE secondaryReadsBlockBehindReplication(opCtx); - auto swCollAndChunks = - _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion); - callbackFn(opCtx, std::move(swCollAndChunks)); - notify->set(); + return _getCompletePersistedMetadataForSecondarySinceVersion( + opCtx, nss, catalogCacheSinceVersion); } -void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( +StatusWith<CollectionAndChangedChunks> +ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, - long long termScheduled, - std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, - std::shared_ptr<Notification<void>> notify) { + long long termScheduled) { + + // TODO (SERVER-49755): breake up this function into smaller private or anonymous functions. // Get the max version the loader has. const ChunkVersion maxLoaderVersion = [&] { @@ -666,159 +630,131 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince( return getPersistedMaxChunkVersion(opCtx, nss); }(); - auto remoteRefreshFn = [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled]( - OperationContext* opCtx, - StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks) - -> StatusWith<CollectionAndChangedChunks> { - if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { - _ensureMajorityPrimaryAndScheduleCollAndChunksTask( - opCtx, - nss, - collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); - - LOGV2_FOR_CATALOG_REFRESH( - 24107, - 1, - "Cache loader remotely refreshed for collection {namespace} from version " - "{oldCollectionVersion} and no metadata was found", - "Cache loader remotely refreshed for collection and no metadata was found", - "namespace"_attr = nss, - "oldCollectionVersion"_attr = maxLoaderVersion); - return swCollectionAndChangedChunks; - } - - if (!swCollectionAndChangedChunks.isOK()) { - return swCollectionAndChangedChunks; - } + // Refresh the loader's metadata from the config server. The caller's request will + // then be serviced from the loader's up-to-date metadata. + auto swCollectionAndChangedChunks = + _configServerLoader->getChunksSince(nss, maxLoaderVersion).getNoThrow(); - auto& collAndChunks = swCollectionAndChangedChunks.getValue(); - - if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) { - return Status{ErrorCodes::ConflictingOperationInProgress, - str::stream() - << "Invalid chunks found when reloading '" << nss.toString() - << "' Previous collection epoch was '" - << collAndChunks.epoch.toString() << "', but found a new epoch '" - << collAndChunks.changedChunks.back().getVersion().epoch().toString() - << "'."}; - } - - if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || - (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) { - _ensureMajorityPrimaryAndScheduleCollAndChunksTask( - opCtx, - nss, - collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); - } + if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) { + _ensureMajorityPrimaryAndScheduleCollAndChunksTask( + opCtx, + nss, + collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); LOGV2_FOR_CATALOG_REFRESH( - 24108, + 24107, 1, - "Cache loader remotely refreshed for collection {namespace} from collection version " - "{oldCollectionVersion} and found collection version {refreshedCollectionVersion}", - "Cache loader remotely refreshed for collection", + "Cache loader remotely refreshed for collection {namespace} from version " + "{oldCollectionVersion} and no metadata was found", + "Cache loader remotely refreshed for collection and no metadata was found", "namespace"_attr = nss, - "oldCollectionVersion"_attr = maxLoaderVersion, - "refreshedCollectionVersion"_attr = collAndChunks.changedChunks.back().getVersion()); - - // Metadata was found remotely - // -- otherwise we would have received NamespaceNotFound rather than Status::OK(). - // Return metadata for CatalogCache that's GTE catalogCacheSinceVersion, - // from the loader's persisted and enqueued metadata. - - swCollectionAndChangedChunks = - _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); - if (!swCollectionAndChangedChunks.isOK()) { - return swCollectionAndChangedChunks; - } + "oldCollectionVersion"_attr = maxLoaderVersion); + return swCollectionAndChangedChunks; + } - const auto termAfterRefresh = [&] { - stdx::lock_guard<Latch> lock(_mutex); - return _term; - }(); + if (!swCollectionAndChangedChunks.isOK()) { + return swCollectionAndChangedChunks; + } - if (termAfterRefresh != termScheduled) { - // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to - // attempt the refresh as secondary instead of failing the operation - return Status(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Replication stepdown occurred during refresh for '" - << nss.toString()); - } + auto& collAndChunks = swCollectionAndChangedChunks.getValue(); - // After finding metadata remotely, we must have found metadata locally. - invariant(!collAndChunks.changedChunks.empty()); + if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) { + return Status{ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Invalid chunks found when reloading '" << nss.toString() + << "' Previous collection epoch was '" << collAndChunks.epoch.toString() + << "', but found a new epoch '" + << collAndChunks.changedChunks.back().getVersion().epoch().toString() + << "'."}; + } + + if ((collAndChunks.epoch != maxLoaderVersion.epoch()) || + (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) { + _ensureMajorityPrimaryAndScheduleCollAndChunksTask( + opCtx, + nss, + collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled}); + } + LOGV2_FOR_CATALOG_REFRESH( + 24108, + 1, + "Cache loader remotely refreshed for collection {namespace} from collection version " + "{oldCollectionVersion} and found collection version {refreshedCollectionVersion}", + "Cache loader remotely refreshed for collection", + "namespace"_attr = nss, + "oldCollectionVersion"_attr = maxLoaderVersion, + "refreshedCollectionVersion"_attr = collAndChunks.changedChunks.back().getVersion()); + + // Metadata was found remotely + // -- otherwise we would have received NamespaceNotFound rather than Status::OK(). + // Return metadata for CatalogCache that's GTE catalogCacheSinceVersion, + // from the loader's persisted and enqueued metadata. + + swCollectionAndChangedChunks = + _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled); + if (!swCollectionAndChangedChunks.isOK()) { return swCollectionAndChangedChunks; - }; + } - // Refresh the loader's metadata from the config server. The caller's request will - // then be serviced from the loader's up-to-date metadata. - _configServerLoader->getChunksSince( - nss, - maxLoaderVersion, - [this, remoteRefreshFn, callbackFn, notify](auto opCtx, auto swCollectionAndChangedChunks) { - // Complete the callbackFn work. - callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swCollectionAndChangedChunks))); - notify->set(); - }); -} + const auto termAfterRefresh = [&] { + stdx::lock_guard<Latch> lock(_mutex); + return _term; + }(); -void ShardServerCatalogCacheLoader::_runSecondaryGetDatabase( - OperationContext* opCtx, - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { + if (termAfterRefresh != termScheduled) { + // Raising a ConflictingOperationInProgress error here will cause the CatalogCache + // to attempt the refresh as secondary instead of failing the operation + return Status(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Replication stepdown occurred during refresh for '" + << nss.toString()); + } - forcePrimaryDatabaseRefreshAndWaitForReplication(opCtx, dbName); + // After finding metadata remotely, we must have found metadata locally. + invariant(!collAndChunks.changedChunks.empty()); - // Read the local metadata. - auto swDatabaseType = getPersistedDbMetadata(opCtx, dbName); - callbackFn(opCtx, std::move(swDatabaseType)); -} + return swCollectionAndChangedChunks; +}; -void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase( - OperationContext* opCtx, - StringData dbName, - long long termScheduled, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - auto remoteRefreshFn = [this, name = dbName.toString(), termScheduled]( - OperationContext* opCtx, StatusWith<DatabaseType> swDatabaseType) { - if (swDatabaseType == ErrorCodes::NamespaceNotFound) { - _ensureMajorityPrimaryAndScheduleDbTask( - opCtx, name, DBTask{swDatabaseType, termScheduled}); - - LOGV2_FOR_CATALOG_REFRESH( - 24109, - 1, - "Cache loader remotely refreshed for database {db} and found the database has " - "been dropped", - "Cache loader remotely refreshed for database and found the database has been " - "dropped", - "db"_attr = name); - return swDatabaseType; - } - if (!swDatabaseType.isOK()) { - return swDatabaseType; - } +StatusWith<DatabaseType> ShardServerCatalogCacheLoader::_runSecondaryGetDatabase( + OperationContext* opCtx, StringData dbName) { + forcePrimaryDatabaseRefreshAndWaitForReplication(opCtx, dbName); + return getPersistedDbMetadata(opCtx, dbName); +} - _ensureMajorityPrimaryAndScheduleDbTask(opCtx, name, DBTask{swDatabaseType, termScheduled}); +StatusWith<DatabaseType> ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase( + OperationContext* opCtx, StringData dbName, long long termScheduled) { + auto swDatabaseType = _configServerLoader->getDatabase(dbName).getNoThrow(); + if (swDatabaseType == ErrorCodes::NamespaceNotFound) { + _ensureMajorityPrimaryAndScheduleDbTask( + opCtx, dbName, DBTask{swDatabaseType, termScheduled}); - LOGV2_FOR_CATALOG_REFRESH(24110, - 1, - "Cache loader remotely refreshed for database {db} and found " - "{refreshedDatabaseType}", - "Cache loader remotely refreshed for database", - "db"_attr = name, - "refreshedDatabaseType"_attr = - swDatabaseType.getValue().toBSON()); + LOGV2_FOR_CATALOG_REFRESH( + 24109, + 1, + "Cache loader remotely refreshed for database {db} " + "and found the database has been dropped", + "Cache loader remotely refreshed for database and found the database has been dropped", + "db"_attr = dbName); + return swDatabaseType; + } + if (!swDatabaseType.isOK()) { return swDatabaseType; - }; + } + + _ensureMajorityPrimaryAndScheduleDbTask(opCtx, dbName, DBTask{swDatabaseType, termScheduled}); + + LOGV2_FOR_CATALOG_REFRESH(24110, + 1, + "Cache loader remotely refreshed for database {db} " + "and found {refreshedDatabaseType}", + "Cache loader remotely refreshed for database", + "db"_attr = dbName, + "refreshedDatabaseType"_attr = swDatabaseType.getValue().toBSON()); - _configServerLoader->getDatabase( - dbName, [this, remoteRefreshFn, callbackFn](auto opCtx, auto swDatabaseType) { - callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swDatabaseType))); - }); + return swDatabaseType; } StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata( @@ -952,7 +888,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun return; } - _threadPool.schedule([this, nss](auto status) { + _executor->schedule([this, nss](auto status) { invariant(status); _runCollAndChunksTasks(nss); @@ -974,7 +910,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper return; } - _threadPool.schedule([this, name = dbName.toString()](auto status) { + _executor->schedule([this, name = dbName.toString()](auto status) { invariant(status); _runDbTasks(name); @@ -982,7 +918,9 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper } void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) { - auto context = _contexts.makeOperationContext(*Client::getCurrent()); + ThreadClient tc("ShardServerCatalogCacheLoader::runCollAndChunksTasks", + getGlobalServiceContext()); + auto context = _contexts.makeOperationContext(*tc); bool taskFinished = false; bool inShutdown = false; @@ -1027,7 +965,7 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString } } - _threadPool.schedule([this, nss](auto status) { + _executor->schedule([this, nss](auto status) { if (ErrorCodes::isCancelationError(status.code())) { LOGV2(22096, "Cache loader failed to schedule a persisted metadata update task for namespace " @@ -1052,7 +990,8 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString } void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { - auto context = _contexts.makeOperationContext(*Client::getCurrent()); + ThreadClient tc("ShardServerCatalogCacheLoader::runDbTasks", getGlobalServiceContext()); + auto context = _contexts.makeOperationContext(*tc); bool taskFinished = false; bool inShutdown = false; @@ -1097,7 +1036,7 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) { } } - _threadPool.schedule([this, name = dbName.toString()](auto status) { + _executor->schedule([this, name = dbName.toString()](auto status) { if (ErrorCodes::isCancelationError(status.code())) { LOGV2(22099, "Cache loader failed to schedule a persisted metadata update task for namespace " diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h index 2adfe15d9b7..ee9b211b777 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader.h +++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h @@ -50,7 +50,8 @@ class ShardServerCatalogCacheLoader : public CatalogCacheLoader { ShardServerCatalogCacheLoader& operator=(const ShardServerCatalogCacheLoader&) = delete; public: - ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configServerLoader); + ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configServerLoader, + std::shared_ptr<ThreadPool> executor); ~ShardServerCatalogCacheLoader(); /** @@ -77,14 +78,10 @@ public: */ void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override; - std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - GetChunksSinceCallbackFn callbackFn) override; + SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss, + ChunkVersion version) override; - void getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override; + SemiFuture<DatabaseType> getDatabase(StringData dbName) override; void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; @@ -337,61 +334,51 @@ private: /** * Forces the primary to refresh its metadata for 'nss' and waits until this node's metadata * has caught up to the primary's. - * Then retrieves chunk metadata from this node's persisted metadata store and passes it to - * 'callbackFn'. + * + * Returns chunk metadata from this node's persisted metadata store. */ - void _runSecondaryGetChunksSince( + StatusWith<CollectionAndChangedChunks> _runSecondaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, - const ChunkVersion& catalogCacheSinceVersion, - std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, - std::shared_ptr<Notification<void>> notify); + const ChunkVersion& catalogCacheSinceVersion); /** * Refreshes chunk metadata from the config server's metadata store, and schedules maintenance * of the shard's persisted metadata store with the latest updates retrieved from the config * server. * - * Then calls 'callbackFn' with metadata retrieved locally from the shard persisted metadata - * store and any in-memory tasks with terms matching 'currentTerm' enqueued to update that - * store, GTE to 'catalogCacheSinceVersion'. + * Returns the metadata retrieved locally from the shard persisted metadata + * store and any in-memory enqueued tasks to update that store that match the given term, + * grather then or equal to the given chunk version. * * Only run on the shard primary. */ - void _schedulePrimaryGetChunksSince( + StatusWith<CollectionAndChangedChunks> _schedulePrimaryGetChunksSince( OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& catalogCacheSinceVersion, - long long currentTerm, - std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn, - std::shared_ptr<Notification<void>> notify); + long long termScheduled); /** * Forces the primary to refresh its metadata for 'dbName' and waits until this node's metadata * has caught up to the primary's. - * Then retrieves the db version from this node's persisted metadata store and passes it to - * 'callbackFn'. + * Returns the database version from this node's persisted metadata store. */ - void _runSecondaryGetDatabase( - OperationContext* opCtx, - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn); + StatusWith<DatabaseType> _runSecondaryGetDatabase(OperationContext* opCtx, StringData dbName); /** * Refreshes db version from the config server's metadata store, and schedules maintenance * of the shard's persisted metadata store with the latest updates retrieved from the config * server. * - * Then calls 'callbackFn' with metadata retrieved locally from the shard persisted metadata - * to update that store. + * Returns the metadata retrieved locally from the shard persisted metadata to update that + * store. * * Only run on the shard primary. */ - void _schedulePrimaryGetDatabase( - OperationContext* opCtx, - StringData dbName, - long long termScheduled, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn); + StatusWith<DatabaseType> _schedulePrimaryGetDatabase(OperationContext* opCtx, + StringData dbName, + long long termScheduled); /** * Loads chunk metadata from the shard persisted metadata store and any in-memory tasks with @@ -478,7 +465,7 @@ private: std::unique_ptr<CatalogCacheLoader> _configServerLoader; // Thread pool used to run blocking tasks which perform disk reads and writes - ThreadPool _threadPool; + std::shared_ptr<ThreadPool> _executor; // Registry of notifications for changes happening to the shard's on-disk routing information NamespaceMetadataChangeNotifications _namespaceNotifications; diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp index 5cfc7ce4124..445f9320b3d 100644 --- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp +++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp @@ -78,10 +78,6 @@ public: vector<ChunkType> setUpChunkLoaderWithFiveChunks(); const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1)); - const std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> - kDoNothingCallbackFn = []( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {}; CatalogCacheLoaderMock* _remoteLoaderMock; std::unique_ptr<ShardServerCatalogCacheLoader> _shardLoader; @@ -96,9 +92,11 @@ void ShardServerCatalogCacheLoaderTest::setUp() { // Create mock remote and real shard loader, retaining a pointer to the mock remote loader so // that unit tests can manipulate it to return certain responses. - std::unique_ptr<CatalogCacheLoaderMock> mockLoader = std::make_unique<CatalogCacheLoaderMock>(); + _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); + auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(catalogCacheExecutor()); _remoteLoaderMock = mockLoader.get(); - _shardLoader = std::make_unique<ShardServerCatalogCacheLoader>(std::move(mockLoader)); + _shardLoader = std::make_unique<ShardServerCatalogCacheLoader>(std::move(mockLoader), + catalogCacheExecutor()); // Set the shard loader to primary mode, and set it for testing. _shardLoader->initializeReplicaSetRole(true); @@ -207,25 +205,12 @@ vector<ChunkType> ShardServerCatalogCacheLoaderTest::setUpChunkLoaderWithFiveChu _remoteLoaderMock->setCollectionRefreshReturnValue(collectionType); _remoteLoaderMock->setChunkRefreshReturnValue(chunks); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); - notification->get(); - - // Check refreshCallbackFn thread results where we can safely throw. - ASSERT_OK(results.getStatus()); - auto collAndChunkRes = results.getValue(); - ASSERT_EQUALS(collAndChunkRes.epoch, collectionType.getEpoch()); - ASSERT_EQUALS(collAndChunkRes.changedChunks.size(), 5UL); - for (unsigned int i = 0; i < collAndChunkRes.changedChunks.size(); ++i) { - ASSERT_BSONOBJ_EQ(collAndChunkRes.changedChunks[i].toShardBSON(), chunks[i].toShardBSON()); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get(); + + ASSERT_EQUALS(collAndChunksRes.epoch, collectionType.getEpoch()); + ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); + for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { + ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].toShardBSON(), chunks[i].toShardBSON()); } return chunks; @@ -237,19 +222,10 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromUnshardedToUnsharded) { Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found"); _remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); - notification->get(); - - ASSERT_EQUALS(results.getStatus(), errorStatus); + ASSERT_THROWS_CODE_AND_WHAT(_shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get(), + DBException, + errorStatus.code(), + errorStatus.reason()); } TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedToUnsharded) { @@ -263,19 +239,11 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedToUnsharded) { Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found"); _remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto nextRefreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), nextRefreshCallbackFn); - notification->get(); - - ASSERT_EQUALS(results.getStatus(), errorStatus); + ASSERT_THROWS_CODE_AND_WHAT( + _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(), + DBException, + errorStatus.code(), + errorStatus.reason()); } TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiff) { @@ -290,22 +258,10 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiff) { lastChunk.push_back(chunks.back()); _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn); - notification->get(); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(); // Check that refreshing from the latest version returned a single document matching that // version. - ASSERT_OK(results.getStatus()); - auto collAndChunksRes = results.getValue(); ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch()); ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 1UL); ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks.back().toShardBSON(), @@ -326,21 +282,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiffReq lastChunk.push_back(chunks.back()); _remoteLoaderMock->setChunkRefreshReturnValue(lastChunk); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto completeRefreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), completeRefreshCallbackFn); - notification->get(); - - // Check that the complete routing table was returned successfully. - ASSERT_OK(results.getStatus()); - auto collAndChunksRes = results.getValue(); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get(); ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch()); ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { @@ -361,21 +303,9 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiff) { vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(collVersion); _remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn); - notification->get(); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(); // Check that the diff was returned successfull. - ASSERT_OK(results.getStatus()); - auto collAndChunksRes = results.getValue(); ASSERT_EQUALS(collAndChunksRes.epoch, updatedChunksDiff.front().getVersion().epoch()); ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 4UL); for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { @@ -400,9 +330,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffReque vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(chunks.back().getVersion()); _remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff); - auto notification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), kDoNothingCallbackFn); - notification->get(); + _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(); // Wait for persistence of update _shardLoader->waitForCollectionFlush(operationContext(), kNss); @@ -416,21 +344,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffReque vector<ChunkType> completeRoutingTableWithDiffApplied = makeCombinedOriginalFiveChunksAndThreeNewChunksDiff(chunks, updatedChunksDiff); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto nextNotification = - _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); - nextNotification->get(); - - // Check that the complete routing table, with diff applied, was returned. - ASSERT_OK(results.getStatus()); - auto collAndChunksRes = results.getValue(); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get(); ASSERT_EQUALS(collAndChunksRes.epoch, completeRoutingTableWithDiffApplied.front().getVersion().epoch()); ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); @@ -457,21 +371,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNewEpoch) _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch); _remoteLoaderMock->setChunkRefreshReturnValue(chunksWithNewEpoch); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn); - notification->get(); - - // Check that the complete routing table for the new epoch was returned. - ASSERT_OK(results.getStatus()); - auto collAndChunksRes = results.getValue(); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(); ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch()); ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { @@ -502,19 +402,9 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun mixedChunks.insert(mixedChunks.end(), chunksWithNewEpoch.begin(), chunksWithNewEpoch.end()); _remoteLoaderMock->setChunkRefreshReturnValue(mixedChunks); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> mixedResults{ - Status(ErrorCodes::InternalError, "")}; - const auto mixedRefreshCallbackFn = [&mixedResults]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - mixedResults = std::move(swCollAndChunks); - }; - - auto mixedNotification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), mixedRefreshCallbackFn); - mixedNotification->get(); - - ASSERT_EQUALS(mixedResults.getStatus().code(), ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(_shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(), + DBException, + ErrorCodes::ConflictingOperationInProgress); // Now make sure the newly recreated collection is cleanly loaded. We cannot ensure a // non-variable response until the loader has remotely retrieved the new metadata and applied @@ -524,9 +414,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun _remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch); _remoteLoaderMock->setChunkRefreshReturnValue(chunksWithNewEpoch); - auto cleanNotification = - _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), kDoNothingCallbackFn); - cleanNotification->get(); + _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(); // Wait for persistence of update. _shardLoader->waitForCollectionFlush(operationContext(), kNss); @@ -535,20 +423,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun lastChunkWithNewEpoch.push_back(chunksWithNewEpoch.back()); _remoteLoaderMock->setChunkRefreshReturnValue(lastChunkWithNewEpoch); - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{ - Status(ErrorCodes::InternalError, "")}; - const auto refreshCallbackFn = [&results]( - OperationContext * opCtx, - StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { - results = std::move(swCollAndChunks); - }; - - auto notification = - _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn); - notification->get(); - - ASSERT_OK(results.getStatus()); - auto collAndChunksRes = results.getValue(); + auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get(); ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch()); ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL); for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) { diff --git a/src/mongo/db/s/shard_server_test_fixture.cpp b/src/mongo/db/s/shard_server_test_fixture.cpp index b916c674792..42c1935badc 100644 --- a/src/mongo/db/s/shard_server_test_fixture.cpp +++ b/src/mongo/db/s/shard_server_test_fixture.cpp @@ -65,9 +65,12 @@ void ShardServerTestFixture::setUp() { _clusterId = OID::gen(); ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId); - CatalogCacheLoader::set(getServiceContext(), - std::make_unique<ShardServerCatalogCacheLoader>( - std::make_unique<ConfigServerCatalogCacheLoader>())); + _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); + CatalogCacheLoader::set( + getServiceContext(), + std::make_unique<ShardServerCatalogCacheLoader>( + std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor()), + catalogCacheExecutor())); uassertStatusOK( initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort))); diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 0ab2d96fab7..f8a6c671e5c 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -535,17 +535,21 @@ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); auto const service = opCtx->getServiceContext(); - + auto catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { if (storageGlobalParams.readOnly) { - CatalogCacheLoader::set(service, std::make_unique<ReadOnlyCatalogCacheLoader>()); + CatalogCacheLoader::set( + service, std::make_unique<ReadOnlyCatalogCacheLoader>(catalogCacheExecutor)); } else { - CatalogCacheLoader::set(service, - std::make_unique<ShardServerCatalogCacheLoader>( - std::make_unique<ConfigServerCatalogCacheLoader>())); + CatalogCacheLoader::set( + service, + std::make_unique<ShardServerCatalogCacheLoader>( + std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor), + catalogCacheExecutor)); } } else { - CatalogCacheLoader::set(service, std::make_unique<ConfigServerCatalogCacheLoader>()); + CatalogCacheLoader::set( + service, std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor)); } auto validator = LogicalTimeValidator::get(service); @@ -555,7 +559,8 @@ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, globalConnPool.addHook(new ShardingConnectionHook(makeEgressHooksList(service))); - auto catalogCache = std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx)); + auto catalogCache = + std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx), catalogCacheExecutor); // List of hooks which will be called by the ShardRegistry when it discovers a shard has been // removed. diff --git a/src/mongo/db/s/sharding_initialization_mongod_test.cpp b/src/mongo/db/s/sharding_initialization_mongod_test.cpp index a724a245f84..30158e7c22a 100644 --- a/src/mongo/db/s/sharding_initialization_mongod_test.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod_test.cpp @@ -70,9 +70,12 @@ protected: // When sharding initialization is triggered, initialize sharding state as a shard server. serverGlobalParams.clusterRole = ClusterRole::ShardServer; - CatalogCacheLoader::set(getServiceContext(), - std::make_unique<ShardServerCatalogCacheLoader>( - std::make_unique<ConfigServerCatalogCacheLoader>())); + _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); + CatalogCacheLoader::set( + getServiceContext(), + std::make_unique<ShardServerCatalogCacheLoader>( + std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor()), + catalogCacheExecutor())); ShardingInitializationMongoD::get(getServiceContext()) ->setGlobalInitMethodForTest([&](OperationContext* opCtx, diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp index 103ecc582d1..911c4c1da07 100644 --- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp @@ -256,7 +256,8 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest( auto const grid = Grid::get(operationContext()); grid->init(makeShardingCatalogClient(std::move(distLockManagerPtr)), - std::make_unique<CatalogCache>(CatalogCacheLoader::get(getServiceContext())), + std::make_unique<CatalogCache>(CatalogCacheLoader::get(getServiceContext()), + catalogCacheExecutor()), makeShardRegistry(configConnStr), makeClusterCursorManager(), makeBalancerConfiguration(), diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index dd5239b776b..34760afb2df 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -249,6 +249,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/logical_time_metadata_hook', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', '$BUILD_DIR/mongo/executor/task_executor_pool', 'client/shard_interface', 'query/cluster_cursor_manager', diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 2eb6c3371eb..2fc6de7cb30 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -135,7 +135,19 @@ std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo( } // namespace -CatalogCache::CatalogCache(CatalogCacheLoader& cacheLoader) : _cacheLoader(cacheLoader) {} +std::shared_ptr<ThreadPool> CatalogCache::makeDefaultThreadPool() { + ThreadPool::Options options; + options.poolName = "CatalogCache"; + options.minThreads = 0; + options.maxThreads = 6; + + auto executor = std::make_shared<ThreadPool>(std::move(options)); + executor->startup(); + return executor; +} + +CatalogCache::CatalogCache(CatalogCacheLoader& cacheLoader, std::shared_ptr<ThreadPool> executor) + : _cacheLoader(cacheLoader), _executor(executor){}; CatalogCache::~CatalogCache() = default; @@ -159,7 +171,7 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx if (!refreshNotification) { refreshNotification = (dbEntry->refreshCompletionNotification = std::make_shared<Notification<Status>>()); - _scheduleDatabaseRefresh(ul, dbName.toString(), dbEntry); + _scheduleDatabaseRefresh(ul, dbName, dbEntry); } // Wait on the notification outside of the mutex. @@ -236,7 +248,7 @@ CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt( if (!refreshNotification) { refreshNotification = (collEntry->refreshCompletionNotification = std::make_shared<Notification<Status>>()); - _scheduleCollectionRefresh(ul, collEntry, nss, 1); + _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1); refreshActionTaken = RefreshAction::kPerformedRefresh; } @@ -584,76 +596,10 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC } void CatalogCache::_scheduleDatabaseRefresh(WithLock lk, - const std::string& dbName, + StringData dbName, std::shared_ptr<DatabaseInfoEntry> dbEntry) { - const auto onRefreshCompleted = [this, t = Timer(), dbName, dbEntry]( - const StatusWith<DatabaseType>& swDbt) { - // TODO (SERVER-34164): Track and increment stats for database refreshes. - if (!swDbt.isOK()) { - LOGV2_OPTIONS(24100, - {logv2::LogComponent::kShardingCatalogRefresh}, - "Error refreshing cached database entry for {db}. Took {duration} and " - "failed due to {error}", - "Error refreshing cached database entry", - "db"_attr = dbName, - "duration"_attr = Milliseconds(t.millis()), - "error"_attr = redact(swDbt.getStatus())); - return; - } - - const auto dbVersionAfterRefresh = swDbt.getValue().getVersion(); - const int logLevel = - (!dbEntry->dbt || - (dbEntry->dbt && - !databaseVersion::equal(dbVersionAfterRefresh, dbEntry->dbt->getVersion()))) - ? 0 - : 1; - LOGV2_FOR_CATALOG_REFRESH( - 24101, - logLevel, - "Refreshed cached database entry for {db} to version {newDbVersion} from version " - "{oldDbVersion}. Took {duration}", - "Refreshed cached database entry", - "db"_attr = dbName, - "newDbVersion"_attr = dbVersionAfterRefresh.toBSON(), - "oldDbVersion"_attr = (dbEntry->dbt ? dbEntry->dbt->getVersion().toBSON() : BSONObj()), - "duration"_attr = Milliseconds(t.millis())); - }; - - // Invoked if getDatabase resulted in error or threw and exception - const auto onRefreshFailed = - [ this, dbName, dbEntry, onRefreshCompleted ](WithLock, const Status& status) noexcept { - onRefreshCompleted(status); - // Clear the notification so the next 'getDatabase' kicks off a new refresh attempt. - dbEntry->refreshCompletionNotification->set(status); - dbEntry->refreshCompletionNotification = nullptr; - - if (status == ErrorCodes::NamespaceNotFound) { - // The refresh found that the database was dropped, so remove its entry from the cache. - _databases.erase(dbName); - _collectionsByDb.erase(dbName); - return; - } - }; - - const auto refreshCallback = [ this, dbName, dbEntry, onRefreshFailed, onRefreshCompleted ]( - OperationContext * opCtx, StatusWith<DatabaseType> swDbt) noexcept { - stdx::lock_guard<Latch> lg(_mutex); - - if (!swDbt.isOK()) { - onRefreshFailed(lg, swDbt.getStatus()); - return; - } - - onRefreshCompleted(swDbt); - - dbEntry->needsRefresh = false; - dbEntry->refreshCompletionNotification->set(Status::OK()); - dbEntry->refreshCompletionNotification = nullptr; - - dbEntry->dbt = std::move(swDbt.getValue()); - }; + // TODO (SERVER-34164): Track and increment stats for database refreshes LOGV2_FOR_CATALOG_REFRESH(24102, 1, @@ -664,16 +610,65 @@ void CatalogCache::_scheduleDatabaseRefresh(WithLock lk, "currentDbInfo"_attr = (dbEntry->dbt ? dbEntry->dbt->toBSON() : BSONObj())); - try { - _cacheLoader.getDatabase(dbName, refreshCallback); - } catch (const DBException& ex) { - const auto status = ex.toStatus(); + Timer t{}; - onRefreshFailed(lk, status); - } + _cacheLoader.getDatabase(dbName) + .thenRunOn(_executor) + .then([=](const DatabaseType& dbt) noexcept { + const auto dbVersionAfterRefresh = dbt.getVersion(); + const auto dbVersionHasChanged = + (!dbEntry->dbt || + (dbEntry->dbt && + !databaseVersion::equal(dbVersionAfterRefresh, dbEntry->dbt->getVersion()))); + + stdx::lock_guard<Latch> lg(_mutex); + + LOGV2_FOR_CATALOG_REFRESH( + 24101, + dbVersionHasChanged ? 0 : 1, + "Refreshed cached database entry for {db} to version {newDbVersion}" + "from version {oldDbVersion}. Took {duration}", + "Refreshed cached database entry", + "db"_attr = dbName, + "newDbVersion"_attr = dbVersionAfterRefresh, + "oldDbVersion"_attr = + (dbEntry->dbt ? dbEntry->dbt->getVersion().toBSON() : BSONObj()), + "duration"_attr = Milliseconds(t.millis())); + + dbEntry->needsRefresh = false; + dbEntry->refreshCompletionNotification->set(Status::OK()); + dbEntry->refreshCompletionNotification = nullptr; + + dbEntry->dbt = std::move(dbt); + }) + .onError([=](Status errStatus) noexcept { + stdx::lock_guard<Latch> lg(_mutex); + + LOGV2_OPTIONS(24100, + {logv2::LogComponent::kShardingCatalogRefresh}, + "Error refreshing cached database entry for {db}. Took {duration} and " + "failed due to {error}", + "Error refreshing cached database entry", + "db"_attr = dbName, + "duration"_attr = Milliseconds(t.millis()), + "error"_attr = redact(errStatus)); + + // Clear the notification so the next 'getDatabase' kicks off a new refresh attempt. + dbEntry->refreshCompletionNotification->set(errStatus); + dbEntry->refreshCompletionNotification = nullptr; + + if (errStatus == ErrorCodes::NamespaceNotFound) { + // The refresh found that the database was dropped, so remove its entry + // from the cache. + _databases.erase(dbName); + _collectionsByDb.erase(dbName); + } + }) + .getAsync([](auto) {}); } void CatalogCache::_scheduleCollectionRefresh(WithLock lk, + ServiceContext* service, std::shared_ptr<CollectionRoutingInfoEntry> collEntry, NamespaceString const& nss, int refreshAttempt) { @@ -744,15 +739,16 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk, }; // Invoked if getChunksSince resulted in error or threw an exception - const auto onRefreshFailed = [ this, collEntry, nss, refreshAttempt, onRefreshCompleted ]( - WithLock lk, const Status& status) noexcept { + const auto onRefreshFailed = + [ this, service, collEntry, nss, refreshAttempt, + onRefreshCompleted ](WithLock lk, const Status& status) noexcept { onRefreshCompleted(status, nullptr); // It is possible that the metadata is being changed concurrently, so retry the // refresh again if (status == ErrorCodes::ConflictingOperationInProgress && refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) { - _scheduleCollectionRefresh(lk, collEntry, nss, refreshAttempt + 1); + _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1); } else { // Leave needsRefresh to true so that any subsequent get attempts will kick off // another round of refresh @@ -762,13 +758,16 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk, }; const auto refreshCallback = - [ this, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ]( - OperationContext * opCtx, + [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ]( StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept { + + ThreadClient tc("CatalogCache::collectionRefresh", service); + auto opCtx = tc->makeOperationContext(); + std::shared_ptr<RoutingTableHistory> newRoutingInfo; try { newRoutingInfo = refreshCollectionRoutingInfo( - opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); + opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks)); onRefreshCompleted(Status::OK(), newRoutingInfo.get()); } catch (const DBException& ex) { @@ -784,7 +783,7 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk, collEntry->refreshCompletionNotification->set(Status::OK()); collEntry->refreshCompletionNotification = nullptr; - setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false); + setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false); if (existingRoutingInfo && newRoutingInfo && existingRoutingInfo->getSequenceNumber() == newRoutingInfo->getSequenceNumber()) { @@ -805,18 +804,9 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk, "namespace"_attr = nss, "currentCollectionVersion"_attr = startingCollectionVersion); - try { - _cacheLoader.getChunksSince(nss, startingCollectionVersion, refreshCallback); - } catch (const DBException& ex) { - const auto status = ex.toStatus(); - - // ConflictingOperationInProgress errors trigger retry of the catalog cache reload logic. If - // we failed to schedule the asynchronous reload, there is no point in doing another - // attempt. - invariant(status != ErrorCodes::ConflictingOperationInProgress); - - onRefreshFailed(lk, status); - } + _cacheLoader.getChunksSince(nss, startingCollectionVersion) + .thenRunOn(_executor) + .getAsync(refreshCallback); // The routing info for this collection shouldn't change, as other threads may try to use the // CatalogCache while we are waiting for the refresh to complete. diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h index 0ce48cd7ffa..0d8b639c68d 100644 --- a/src/mongo/s/catalog_cache.h +++ b/src/mongo/s/catalog_cache.h @@ -40,6 +40,7 @@ #include "mongo/s/client/shard.h" #include "mongo/s/database_version_gen.h" #include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/string_map.h" @@ -128,7 +129,7 @@ class CatalogCache { CatalogCache& operator=(const CatalogCache&) = delete; public: - CatalogCache(CatalogCacheLoader& cacheLoader); + CatalogCache(CatalogCacheLoader& cacheLoader, std::shared_ptr<ThreadPool> executor); ~CatalogCache(); /** @@ -302,6 +303,13 @@ public: */ void checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, mongo::LogicalOp opType); + /** + * Returns a ThreadPool with default options to be used for CatalogCache. + * + * The returned ThreadPool is already started up. + */ + static std::shared_ptr<ThreadPool> makeDefaultThreadPool(); + private: // Make the cache entries friends so they can access the private classes below friend class CachedDatabaseInfo; @@ -353,7 +361,7 @@ private: * database entry must be in the 'needsRefresh' state. */ void _scheduleDatabaseRefresh(WithLock, - const std::string& dbName, + StringData dbName, std::shared_ptr<DatabaseInfoEntry> dbEntry); /** @@ -361,6 +369,7 @@ private: * namespace must be in the 'needRefresh' state. */ void _scheduleCollectionRefresh(WithLock, + ServiceContext* service, std::shared_ptr<CollectionRoutingInfoEntry> collEntry, NamespaceString const& nss, int refreshAttempt); @@ -487,6 +496,8 @@ private: } _stats; + std::shared_ptr<ThreadPool> _executor; + using DatabaseInfoMap = StringMap<std::shared_ptr<DatabaseInfoEntry>>; using CollectionInfoMap = StringMap<std::shared_ptr<CollectionRoutingInfoEntry>>; using CollectionsByDbMap = StringMap<CollectionInfoMap>; diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h index 967c68f3627..e451664eda1 100644 --- a/src/mongo/s/catalog_cache_loader.h +++ b/src/mongo/s/catalog_cache_loader.h @@ -117,34 +117,23 @@ public: virtual void notifyOfCollectionVersionUpdate(const NamespaceString& nss) = 0; /** - * Non-blocking call, which requests the chunks changed since the specified version to be - * fetched from the persistent metadata store and invokes the callback function with the result. - * The callback function must never throw - it is a fatal error to do so. + * Non-blocking call, which returns the chunks changed since the specified version to be + * fetched from the persistent metadata store. * * If for some reason the asynchronous fetch operation cannot be dispatched (for example on - * shutdown), throws a DBException. Otherwise it is guaranteed that the callback function will - * be invoked even on error and the returned notification will be signalled. - * - * The callbackFn object must not be destroyed until it has been called. The returned - * Notification object can be waited on in order to ensure that. + * shutdown), throws a DBException. */ - virtual std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) = 0; + virtual SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss, + ChunkVersion version) = 0; /** - * Non-blocking call, which requests the most recent db version for the given dbName from the - * the persistent metadata store and invokes the callback function with the result. - * The callback function must never throw - it is a fatal error to do so. + * Non-blocking call, which returns the most recent db version for the given dbName from the + * the persistent metadata store. * * If for some reason the asynchronous fetch operation cannot be dispatched (for example on - * shutdown), throws a DBException. Otherwise it is guaranteed that the callback function will - * be invoked even on error. - * - * The callbackFn object must not be destroyed until it has been called. + * shutdown), throws a DBException. */ - virtual void getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) = 0; + virtual SemiFuture<DatabaseType> getDatabase(StringData dbName) = 0; /** * Waits for any pending changes for the specified collection to be persisted locally (not diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 7332baf5610..c40f11e661c 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -49,23 +49,6 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk namespace { /** - * Constructs the default options for the thread pool used by the cache loader. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "ConfigServerCatalogCacheLoader"; - options.minThreads = 0; - options.maxThreads = 6; - - // Ensure all threads have a client - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - - return options; -} - -/** * Structure repsenting the generated query and sort order for a chunk diffing operation. */ struct QueryAndSort { @@ -136,14 +119,8 @@ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx, } // namespace -ConfigServerCatalogCacheLoader::ConfigServerCatalogCacheLoader() - : _threadPool(makeDefaultThreadPoolOptions()) { - _threadPool.startup(); -} - -ConfigServerCatalogCacheLoader::~ConfigServerCatalogCacheLoader() { - shutDown(); -} +ConfigServerCatalogCacheLoader::ConfigServerCatalogCacheLoader(std::shared_ptr<ThreadPool> executor) + : _executor(executor) {} void ConfigServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) { MONGO_UNREACHABLE; @@ -157,19 +134,7 @@ void ConfigServerCatalogCacheLoader::onStepUp() { MONGO_UNREACHABLE; } -void ConfigServerCatalogCacheLoader::shutDown() { - { - stdx::lock_guard<Latch> lg(_mutex); - if (_inShutdown) { - return; - } - - _inShutdown = true; - } - - _threadPool.shutdown(); - _threadPool.join(); -} +void ConfigServerCatalogCacheLoader::shutDown() {} void ConfigServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) { MONGO_UNREACHABLE; @@ -185,53 +150,34 @@ void ConfigServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCt MONGO_UNREACHABLE; } -std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince( - const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) { - auto notify = std::make_shared<Notification<void>>(); - - _threadPool.schedule([ nss, version, notify, callbackFn ](auto status) noexcept { - invariant(status); - - auto opCtx = Client::getCurrent()->makeOperationContext(); - - auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> { - try { - return getChangedChunks(opCtx.get(), nss, version); - } catch (const DBException& ex) { - return ex.toStatus(); - } - }(); +SemiFuture<CollectionAndChangedChunks> ConfigServerCatalogCacheLoader::getChunksSince( + const NamespaceString& nss, ChunkVersion version) { - callbackFn(opCtx.get(), std::move(swCollAndChunks)); - notify->set(); - }); + return ExecutorFuture<void>(_executor) + .then([=]() { + ThreadClient tc("ConfigServerCatalogCacheLoader::getChunksSince", + getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); - return notify; + return getChangedChunks(opCtx.get(), nss, version); + }) + .semi(); } -void ConfigServerCatalogCacheLoader::getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) { - _threadPool.schedule([ name = dbName.toString(), callbackFn ](auto status) noexcept { - invariant(status); - - auto opCtx = Client::getCurrent()->makeOperationContext(); - - auto swDbt = [&]() -> StatusWith<DatabaseType> { - try { - return uassertStatusOK( - Grid::get(opCtx.get()) - ->catalogClient() - ->getDatabase( - opCtx.get(), name, repl::ReadConcernLevel::kMajorityReadConcern)) - .value; - } catch (const DBException& ex) { - return ex.toStatus(); - } - }(); - - callbackFn(opCtx.get(), std::move(swDbt)); - }); +SemiFuture<DatabaseType> ConfigServerCatalogCacheLoader::getDatabase(StringData dbName) { + return ExecutorFuture<void>(_executor) + .then([name = dbName.toString()] { + ThreadClient tc("ConfigServerCatalogCacheLoader::getDatabase", + getGlobalServiceContext()); + auto opCtx = tc->makeOperationContext(); + return uassertStatusOK(Grid::get(opCtx.get()) + ->catalogClient() + ->getDatabase(opCtx.get(), + name, + repl::ReadConcernLevel::kMajorityReadConcern)) + .value; + }) + .semi(); } } // namespace mongo diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h index 2da4fb9a8e9..09311d7a2d0 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.h +++ b/src/mongo/s/config_server_catalog_cache_loader.h @@ -36,8 +36,8 @@ namespace mongo { class ConfigServerCatalogCacheLoader final : public CatalogCacheLoader { public: - ConfigServerCatalogCacheLoader(); - ~ConfigServerCatalogCacheLoader(); + ConfigServerCatalogCacheLoader(std::shared_ptr<ThreadPool> executor); + ~ConfigServerCatalogCacheLoader() = default; /** * These functions should never be called. They trigger invariants if called. @@ -50,24 +50,13 @@ public: void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override; void waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) override; - std::shared_ptr<Notification<void>> getChunksSince( - const NamespaceString& nss, - ChunkVersion version, - GetChunksSinceCallbackFn callbackFn) override; - - void getDatabase( - StringData dbName, - std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override; + SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss, + ChunkVersion version) override; + SemiFuture<DatabaseType> getDatabase(StringData dbName) override; private: // Thread pool to be used to perform metadata load - ThreadPool _threadPool; - - // Protects the class state below - Mutex _mutex = MONGO_MAKE_LATCH("ConfigServerCatalogCacheLoader::_mutex"); - - // True if shutDown was called. - bool _inShutdown{false}; + std::shared_ptr<ThreadPool> _executor; }; } // namespace mongo diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp index 92ea2adc996..06c6e3e4dd1 100644 --- a/src/mongo/s/mongos_main.cpp +++ b/src/mongo/s/mongos_main.cpp @@ -418,10 +418,12 @@ Status initializeSharding(OperationContext* opCtx) { auto shardFactory = std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); + auto catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); CatalogCacheLoader::set(opCtx->getServiceContext(), - std::make_unique<ConfigServerCatalogCacheLoader>()); + std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor)); - auto catalogCache = std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx)); + auto catalogCache = + std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx), catalogCacheExecutor); // List of hooks which will be called by the ShardRegistry when it discovers a shard has been // removed. diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp index e7768f659ad..7142c90ae11 100644 --- a/src/mongo/s/sharding_router_test_fixture.cpp +++ b/src/mongo/s/sharding_router_test_fixture.cpp @@ -167,18 +167,21 @@ ShardingTestFixture::ShardingTestFixture() auto shardRegistry(std::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); executorPool->startup(); - CatalogCacheLoader::set(service, std::make_unique<ConfigServerCatalogCacheLoader>()); + _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool(); + CatalogCacheLoader::set( + service, std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor())); // For now initialize the global grid object. All sharding objects will be accessible from there // until we get rid of it. auto const grid = Grid::get(operationContext()); - grid->init(makeShardingCatalogClient(std::move(uniqueDistLockManager)), - std::make_unique<CatalogCache>(CatalogCacheLoader::get(service)), - std::move(shardRegistry), - std::make_unique<ClusterCursorManager>(service->getPreciseClockSource()), - std::make_unique<BalancerConfiguration>(), - std::move(executorPool), - _mockNetwork); + grid->init( + makeShardingCatalogClient(std::move(uniqueDistLockManager)), + std::make_unique<CatalogCache>(CatalogCacheLoader::get(service), catalogCacheExecutor()), + std::move(shardRegistry), + std::make_unique<ClusterCursorManager>(service->getPreciseClockSource()), + std::make_unique<BalancerConfiguration>(), + std::move(executorPool), + _mockNetwork); if (grid->catalogClient()) { grid->catalogClient()->startup(); diff --git a/src/mongo/s/sharding_test_fixture_common.h b/src/mongo/s/sharding_test_fixture_common.h index 317e7f03b50..0f7d3730d6d 100644 --- a/src/mongo/s/sharding_test_fixture_common.h +++ b/src/mongo/s/sharding_test_fixture_common.h @@ -83,6 +83,10 @@ protected: return _distLockManager; } + std::shared_ptr<ThreadPool> catalogCacheExecutor() const { + invariant(_catalogCacheExecutor); + return _catalogCacheExecutor; + } /** * Blocking methods, which receive one message from the network and respond using the responses * returned from the input function. This is a syntactic sugar for simple, single request + @@ -148,6 +152,8 @@ protected: // store a raw pointer to it here. DistLockManager* _distLockManager = nullptr; + std::shared_ptr<ThreadPool> _catalogCacheExecutor = nullptr; + private: // Keeps the lifetime of the operation context ServiceContext::UniqueOperationContext _opCtxHolder; |