summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.cpp124
-rw-r--r--src/mongo/db/s/catalog_cache_loader_mock.h16
-rw-r--r--src/mongo/db/s/config/config_server_test_fixture.cpp7
-rw-r--r--src/mongo/db/s/migration_util_test.cpp4
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.cpp12
-rw-r--r--src/mongo/db/s/read_only_catalog_cache_loader.h13
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.cpp389
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader.h57
-rw-r--r--src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp185
-rw-r--r--src/mongo/db/s/shard_server_test_fixture.cpp9
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp19
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod_test.cpp9
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.cpp3
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog_cache.cpp182
-rw-r--r--src/mongo/s/catalog_cache.h15
-rw-r--r--src/mongo/s/catalog_cache_loader.h29
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp108
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.h23
-rw-r--r--src/mongo/s/mongos_main.cpp6
-rw-r--r--src/mongo/s/sharding_router_test_fixture.cpp19
-rw-r--r--src/mongo/s/sharding_test_fixture_common.h6
22 files changed, 462 insertions, 774 deletions
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.cpp b/src/mongo/db/s/catalog_cache_loader_mock.cpp
index 9bd095b9f6b..ed80a5df4ac 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.cpp
+++ b/src/mongo/db/s/catalog_cache_loader_mock.cpp
@@ -40,35 +40,8 @@ namespace mongo {
using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunks;
-namespace {
-
-/**
- * Constructs the options for the loader thread pool.
- */
-ThreadPool::Options makeDefaultThreadPoolOptions() {
- ThreadPool::Options options;
- options.poolName = "CatalogCacheLoaderMock";
- options.minThreads = 0;
- options.maxThreads = 1;
-
- // Ensure all threads have a client.
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
-
- return options;
-}
-
-} // namespace
-
-CatalogCacheLoaderMock::CatalogCacheLoaderMock() : _threadPool(makeDefaultThreadPoolOptions()) {
- _threadPool.startup();
-}
-
-CatalogCacheLoaderMock::~CatalogCacheLoaderMock() {
- _threadPool.shutdown();
- _threadPool.join();
-}
+CatalogCacheLoaderMock::CatalogCacheLoaderMock(std::shared_ptr<ThreadPool> executor)
+ : _executor(executor) {}
void CatalogCacheLoaderMock::initializeReplicaSetRole(bool isPrimary) {
MONGO_UNREACHABLE;
@@ -97,68 +70,41 @@ void CatalogCacheLoaderMock::waitForDatabaseFlush(OperationContext* opCtx, Strin
MONGO_UNREACHABLE;
}
-std::shared_ptr<Notification<void>> CatalogCacheLoaderMock::getChunksSince(
- const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
- auto notify = std::make_shared<Notification<void>>();
-
- _threadPool.schedule([ this, notify, callbackFn ](auto status) noexcept {
- invariant(status);
-
- auto opCtx = Client::getCurrent()->makeOperationContext();
-
- auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
- try {
- uassertStatusOK(_swCollectionReturnValue);
- uassertStatusOK(_swChunksReturnValue);
-
- // We swap the chunks out of _swChunksReturnValue to ensure if this task is
- // scheduled multiple times that we don't inform the ChunkManager about a chunk it
- // has already updated.
- std::vector<ChunkType> chunks;
- _swChunksReturnValue.getValue().swap(chunks);
-
- return CollectionAndChangedChunks(
- _swCollectionReturnValue.getValue().getUUID(),
- _swCollectionReturnValue.getValue().getEpoch(),
- _swCollectionReturnValue.getValue().getKeyPattern().toBSON(),
- _swCollectionReturnValue.getValue().getDefaultCollation(),
- _swCollectionReturnValue.getValue().getUnique(),
- std::move(chunks));
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- }();
-
- callbackFn(opCtx.get(), std::move(swCollAndChunks));
- notify->set();
- });
-
- return notify;
+SemiFuture<CollectionAndChangedChunks> CatalogCacheLoaderMock::getChunksSince(
+ const NamespaceString& nss, ChunkVersion version) {
+
+ return ExecutorFuture<void>(_executor)
+ .then([this] {
+ uassertStatusOK(_swCollectionReturnValue);
+ uassertStatusOK(_swChunksReturnValue);
+
+ // We swap the chunks out of _swChunksReturnValue to ensure if this task is
+ // scheduled multiple times that we don't inform the ChunkManager about a chunk it
+ // has already updated.
+ std::vector<ChunkType> chunks;
+ _swChunksReturnValue.getValue().swap(chunks);
+
+ return CollectionAndChangedChunks(
+ _swCollectionReturnValue.getValue().getUUID(),
+ _swCollectionReturnValue.getValue().getEpoch(),
+ _swCollectionReturnValue.getValue().getKeyPattern().toBSON(),
+ _swCollectionReturnValue.getValue().getDefaultCollation(),
+ _swCollectionReturnValue.getValue().getUnique(),
+ std::move(chunks));
+ })
+ .semi();
}
-void CatalogCacheLoaderMock::getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- _threadPool.schedule([ this, callbackFn ](auto status) noexcept {
- invariant(status);
-
- auto opCtx = Client::getCurrent()->makeOperationContext();
-
- auto swDatabase = [&]() -> StatusWith<DatabaseType> {
- try {
- uassertStatusOK(_swDatabaseReturnValue);
-
- return DatabaseType(_swDatabaseReturnValue.getValue().getName(),
- _swDatabaseReturnValue.getValue().getPrimary(),
- _swDatabaseReturnValue.getValue().getSharded(),
- _swDatabaseReturnValue.getValue().getVersion());
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- }();
-
- callbackFn(opCtx.get(), std::move(swDatabase));
- });
+SemiFuture<DatabaseType> CatalogCacheLoaderMock::getDatabase(StringData dbName) {
+ return ExecutorFuture<void>(_executor)
+ .then([this] {
+ uassertStatusOK(_swDatabaseReturnValue);
+ return DatabaseType(_swDatabaseReturnValue.getValue().getName(),
+ _swDatabaseReturnValue.getValue().getPrimary(),
+ _swDatabaseReturnValue.getValue().getSharded(),
+ _swDatabaseReturnValue.getValue().getVersion());
+ })
+ .semi();
}
void CatalogCacheLoaderMock::setCollectionRefreshReturnValue(
diff --git a/src/mongo/db/s/catalog_cache_loader_mock.h b/src/mongo/db/s/catalog_cache_loader_mock.h
index ce566a02fd7..dfc43a34867 100644
--- a/src/mongo/db/s/catalog_cache_loader_mock.h
+++ b/src/mongo/db/s/catalog_cache_loader_mock.h
@@ -43,8 +43,8 @@ class CatalogCacheLoaderMock final : public CatalogCacheLoader {
CatalogCacheLoaderMock& operator=(const CatalogCacheLoaderMock&) = delete;
public:
- CatalogCacheLoaderMock();
- ~CatalogCacheLoaderMock();
+ CatalogCacheLoaderMock(std::shared_ptr<ThreadPool> executor);
+ ~CatalogCacheLoaderMock() = default;
/**
* These functions should never be called. They trigger invariants if called.
@@ -57,14 +57,10 @@ public:
void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
void waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) override;
- std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- GetChunksSinceCallbackFn callbackFn) override;
+ SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss,
+ ChunkVersion version) override;
- void getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override;
+ SemiFuture<DatabaseType> getDatabase(StringData dbName) override;
/**
* Sets the mocked collection entry result that getChunksSince will use to construct its return
@@ -96,7 +92,7 @@ private:
Status(ErrorCodes::InternalError, "config loader mock chunks response is uninitialized")};
// Thread pool on which to mock load chunk metadata.
- ThreadPool _threadPool;
+ std::shared_ptr<ThreadPool> _executor;
};
} // namespace mongo
diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp
index de55abb2be4..1470e79bbe2 100644
--- a/src/mongo/db/s/config/config_server_test_fixture.cpp
+++ b/src/mongo/db/s/config/config_server_test_fixture.cpp
@@ -142,9 +142,10 @@ void ConfigServerTestFixture::_setUp(std::function<void()> onPreInitGlobalStateF
_addShardNetworkTestEnv =
std::make_unique<NetworkTestEnv>(_executorForAddShard, _mockNetworkForAddShard);
-
- CatalogCacheLoader::set(getServiceContext(),
- std::make_unique<ConfigServerCatalogCacheLoader>());
+ _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
+ CatalogCacheLoader::set(
+ getServiceContext(),
+ std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor()));
onPreInitGlobalStateFn();
diff --git a/src/mongo/db/s/migration_util_test.cpp b/src/mongo/db/s/migration_util_test.cpp
index 8e8e6e3f52c..82ae3514a64 100644
--- a/src/mongo/db/s/migration_util_test.cpp
+++ b/src/mongo/db/s/migration_util_test.cpp
@@ -346,8 +346,8 @@ public:
_clusterId = OID::gen();
ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId);
- std::unique_ptr<CatalogCacheLoaderMock> mockLoader =
- std::make_unique<CatalogCacheLoaderMock>();
+ _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
+ auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(catalogCacheExecutor());
_mockCatalogCacheLoader = mockLoader.get();
CatalogCacheLoader::set(getServiceContext(), std::move(mockLoader));
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.cpp b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
index b381c7e596d..7744a517048 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.cpp
@@ -52,15 +52,13 @@ void ReadOnlyCatalogCacheLoader::shutDown() {
_configServerLoader.shutDown();
}
-std::shared_ptr<Notification<void>> ReadOnlyCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
- return _configServerLoader.getChunksSince(nss, version, callbackFn);
+SemiFuture<CollectionAndChangedChunks> ReadOnlyCatalogCacheLoader::getChunksSince(
+ const NamespaceString& nss, ChunkVersion version) {
+ return _configServerLoader.getChunksSince(nss, version);
}
-void ReadOnlyCatalogCacheLoader::getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- return _configServerLoader.getDatabase(dbName, callbackFn);
+SemiFuture<DatabaseType> ReadOnlyCatalogCacheLoader::getDatabase(StringData dbName) {
+ return _configServerLoader.getDatabase(dbName);
}
} // namespace mongo
diff --git a/src/mongo/db/s/read_only_catalog_cache_loader.h b/src/mongo/db/s/read_only_catalog_cache_loader.h
index 83df1fd5114..e5f95f75faa 100644
--- a/src/mongo/db/s/read_only_catalog_cache_loader.h
+++ b/src/mongo/db/s/read_only_catalog_cache_loader.h
@@ -40,6 +40,8 @@ namespace mongo {
*/
class ReadOnlyCatalogCacheLoader final : public CatalogCacheLoader {
public:
+ ReadOnlyCatalogCacheLoader(std::shared_ptr<ThreadPool> executor)
+ : _configServerLoader(executor){};
~ReadOnlyCatalogCacheLoader();
void initializeReplicaSetRole(bool isPrimary) override {}
@@ -50,14 +52,9 @@ public:
void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
void waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) override;
- std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- GetChunksSinceCallbackFn callbackFn) override;
-
- void getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override;
+ SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss,
+ ChunkVersion version) override;
+ SemiFuture<DatabaseType> getDatabase(StringData dbName) override;
private:
ConfigServerCatalogCacheLoader _configServerLoader;
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
index 6b880b788cc..5575361c770 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.cpp
@@ -66,23 +66,6 @@ MONGO_FAIL_POINT_DEFINE(hangPersistCollectionAndChangedChunksAfterDropChunks);
AtomicWord<unsigned long long> taskIdGenerator{0};
-/**
- * Constructs the options for the loader thread pool.
- */
-ThreadPool::Options makeDefaultThreadPoolOptions() {
- ThreadPool::Options options;
- options.poolName = "ShardServerCatalogCacheLoader";
- options.minThreads = 0;
- options.maxThreads = 6;
-
- // Ensure all threads have a client.
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
-
- return options;
-}
-
void dropChunksIfEpochChanged(OperationContext* opCtx,
const NamespaceString& nss,
const CollectionAndChangedChunks& collAndChunks,
@@ -369,11 +352,8 @@ ChunkVersion getLocalVersion(OperationContext* opCtx, const NamespaceString& nss
} // namespace
ShardServerCatalogCacheLoader::ShardServerCatalogCacheLoader(
- std::unique_ptr<CatalogCacheLoader> configServerLoader)
- : _configServerLoader(std::move(configServerLoader)),
- _threadPool(makeDefaultThreadPoolOptions()) {
- _threadPool.startup();
-}
+ std::unique_ptr<CatalogCacheLoader> configServerLoader, std::shared_ptr<ThreadPool> executor)
+ : _configServerLoader(std::move(configServerLoader)), _executor(executor) {}
ShardServerCatalogCacheLoader::~ShardServerCatalogCacheLoader() {
shutDown();
@@ -420,22 +400,21 @@ void ShardServerCatalogCacheLoader::shutDown() {
}
// Prevent further scheduling, then interrupt ongoing tasks.
- _threadPool.shutdown();
+ _executor->shutdown();
{
stdx::lock_guard<Latch> lock(_mutex);
_contexts.interrupt(ErrorCodes::InterruptedAtShutdown);
++_term;
}
- _threadPool.join();
+ _executor->join();
invariant(_contexts.isEmpty());
_configServerLoader->shutDown();
}
-std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
- auto notify = std::make_shared<Notification<void>>();
+SemiFuture<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::getChunksSince(
+ const NamespaceString& nss, ChunkVersion version) {
bool isPrimary;
long long term;
@@ -444,57 +423,46 @@ std::shared_ptr<Notification<void>> ShardServerCatalogCacheLoader::getChunksSinc
return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
}();
- _threadPool.schedule(
- [ this, nss, version, callbackFn, notify, isPrimary, term ](auto status) noexcept {
- invariant(status);
-
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
- auto const opCtx = context.opCtx();
-
- try {
- {
- // We may have missed an OperationContextGroup interrupt since this operation
- // began but before the OperationContext was added to the group. So we'll check
- // that we're still in the same _term.
- stdx::lock_guard<Latch> lock(_mutex);
- uassert(ErrorCodes::InterruptedDueToReplStateChange,
- "Unable to refresh routing table because replica set state changed or "
- "the node is shutting down.",
- _term == term);
- }
-
- if (isPrimary) {
- _schedulePrimaryGetChunksSince(opCtx, nss, version, term, callbackFn, notify);
- } else {
- _runSecondaryGetChunksSince(opCtx, nss, version, callbackFn, notify);
- }
- } catch (const DBException& ex) {
- callbackFn(opCtx, ex.toStatus());
- notify->set();
+ return ExecutorFuture<void>(_executor)
+ .then([=]() {
+ ThreadClient tc("ShardServerCatalogCacheLoader::getChunksSince",
+ getGlobalServiceContext());
+ auto context = _contexts.makeOperationContext(*tc);
+ {
+ // We may have missed an OperationContextGroup interrupt since this operation
+ // began but before the OperationContext was added to the group. So we'll check
+ // that we're still in the same _term.
+ stdx::lock_guard<Latch> lock(_mutex);
+ uassert(ErrorCodes::InterruptedDueToReplStateChange,
+ "Unable to refresh routing table because replica set state changed or "
+ "the node is shutting down.",
+ _term == term);
}
- });
- return notify;
+ if (isPrimary) {
+ return _schedulePrimaryGetChunksSince(context.opCtx(), nss, version, term);
+ } else {
+ return _runSecondaryGetChunksSince(context.opCtx(), nss, version);
+ }
+ })
+ .semi();
}
-void ShardServerCatalogCacheLoader::getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- bool isPrimary;
- long long term;
- std::tie(isPrimary, term) = [&] {
+SemiFuture<DatabaseType> ShardServerCatalogCacheLoader::getDatabase(StringData dbName) {
+ const auto [isPrimary, term] = [&] {
stdx::lock_guard<Latch> lock(_mutex);
return std::make_tuple(_role == ReplicaSetRole::Primary, _term);
}();
- _threadPool.schedule([ this, name = dbName.toString(), callbackFn, isPrimary,
- term ](auto status) noexcept {
- invariant(status);
-
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
- auto const opCtx = context.opCtx();
+ return ExecutorFuture<void>(_executor)
+ .then([this,
+ dbName = std::move(dbName),
+ isPrimary = std::move(isPrimary),
+ term = std::move(term)]() {
+ ThreadClient tc("ShardServerCatalogCacheLoader::getDatabase",
+ getGlobalServiceContext());
+ auto context = _contexts.makeOperationContext(*tc);
- try {
{
// We may have missed an OperationContextGroup interrupt since this operation began
// but before the OperationContext was added to the group. So we'll check that we're
@@ -507,14 +475,12 @@ void ShardServerCatalogCacheLoader::getDatabase(
}
if (isPrimary) {
- _schedulePrimaryGetDatabase(opCtx, name, term, callbackFn);
+ return _schedulePrimaryGetDatabase(context.opCtx(), dbName, term);
} else {
- _runSecondaryGetDatabase(opCtx, name, callbackFn);
+ return _runSecondaryGetDatabase(context.opCtx(), dbName);
}
- } catch (const DBException& ex) {
- callbackFn(context.opCtx(), ex.toStatus());
- }
- });
+ })
+ .semi();
}
void ShardServerCatalogCacheLoader::waitForCollectionFlush(OperationContext* opCtx,
@@ -620,12 +586,11 @@ void ShardServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCtx
}
}
-void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
+StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
- const ChunkVersion& catalogCacheSinceVersion,
- std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
- std::shared_ptr<Notification<void>> notify) {
+ const ChunkVersion& catalogCacheSinceVersion) {
+
forcePrimaryCollectionRefreshAndWaitForReplication(opCtx, nss);
// Read the local metadata.
@@ -635,19 +600,18 @@ void ShardServerCatalogCacheLoader::_runSecondaryGetChunksSince(
// CollectionVersionLogOpHandler.
BlockSecondaryReadsDuringBatchApplication_DONT_USE secondaryReadsBlockBehindReplication(opCtx);
- auto swCollAndChunks =
- _getCompletePersistedMetadataForSecondarySinceVersion(opCtx, nss, catalogCacheSinceVersion);
- callbackFn(opCtx, std::move(swCollAndChunks));
- notify->set();
+ return _getCompletePersistedMetadataForSecondarySinceVersion(
+ opCtx, nss, catalogCacheSinceVersion);
}
-void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
+StatusWith<CollectionAndChangedChunks>
+ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
- long long termScheduled,
- std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
- std::shared_ptr<Notification<void>> notify) {
+ long long termScheduled) {
+
+ // TODO (SERVER-49755): breake up this function into smaller private or anonymous functions.
// Get the max version the loader has.
const ChunkVersion maxLoaderVersion = [&] {
@@ -666,159 +630,131 @@ void ShardServerCatalogCacheLoader::_schedulePrimaryGetChunksSince(
return getPersistedMaxChunkVersion(opCtx, nss);
}();
- auto remoteRefreshFn = [this, nss, catalogCacheSinceVersion, maxLoaderVersion, termScheduled](
- OperationContext* opCtx,
- StatusWith<CollectionAndChangedChunks> swCollectionAndChangedChunks)
- -> StatusWith<CollectionAndChangedChunks> {
- if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
- _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
- opCtx,
- nss,
- collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
-
- LOGV2_FOR_CATALOG_REFRESH(
- 24107,
- 1,
- "Cache loader remotely refreshed for collection {namespace} from version "
- "{oldCollectionVersion} and no metadata was found",
- "Cache loader remotely refreshed for collection and no metadata was found",
- "namespace"_attr = nss,
- "oldCollectionVersion"_attr = maxLoaderVersion);
- return swCollectionAndChangedChunks;
- }
-
- if (!swCollectionAndChangedChunks.isOK()) {
- return swCollectionAndChangedChunks;
- }
+ // Refresh the loader's metadata from the config server. The caller's request will
+ // then be serviced from the loader's up-to-date metadata.
+ auto swCollectionAndChangedChunks =
+ _configServerLoader->getChunksSince(nss, maxLoaderVersion).getNoThrow();
- auto& collAndChunks = swCollectionAndChangedChunks.getValue();
-
- if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) {
- return Status{ErrorCodes::ConflictingOperationInProgress,
- str::stream()
- << "Invalid chunks found when reloading '" << nss.toString()
- << "' Previous collection epoch was '"
- << collAndChunks.epoch.toString() << "', but found a new epoch '"
- << collAndChunks.changedChunks.back().getVersion().epoch().toString()
- << "'."};
- }
-
- if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
- (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
- _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
- opCtx,
- nss,
- collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
- }
+ if (swCollectionAndChangedChunks == ErrorCodes::NamespaceNotFound) {
+ _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ opCtx,
+ nss,
+ collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
LOGV2_FOR_CATALOG_REFRESH(
- 24108,
+ 24107,
1,
- "Cache loader remotely refreshed for collection {namespace} from collection version "
- "{oldCollectionVersion} and found collection version {refreshedCollectionVersion}",
- "Cache loader remotely refreshed for collection",
+ "Cache loader remotely refreshed for collection {namespace} from version "
+ "{oldCollectionVersion} and no metadata was found",
+ "Cache loader remotely refreshed for collection and no metadata was found",
"namespace"_attr = nss,
- "oldCollectionVersion"_attr = maxLoaderVersion,
- "refreshedCollectionVersion"_attr = collAndChunks.changedChunks.back().getVersion());
-
- // Metadata was found remotely
- // -- otherwise we would have received NamespaceNotFound rather than Status::OK().
- // Return metadata for CatalogCache that's GTE catalogCacheSinceVersion,
- // from the loader's persisted and enqueued metadata.
-
- swCollectionAndChangedChunks =
- _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
- if (!swCollectionAndChangedChunks.isOK()) {
- return swCollectionAndChangedChunks;
- }
+ "oldCollectionVersion"_attr = maxLoaderVersion);
+ return swCollectionAndChangedChunks;
+ }
- const auto termAfterRefresh = [&] {
- stdx::lock_guard<Latch> lock(_mutex);
- return _term;
- }();
+ if (!swCollectionAndChangedChunks.isOK()) {
+ return swCollectionAndChangedChunks;
+ }
- if (termAfterRefresh != termScheduled) {
- // Raising a ConflictingOperationInProgress error here will cause the CatalogCache to
- // attempt the refresh as secondary instead of failing the operation
- return Status(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Replication stepdown occurred during refresh for '"
- << nss.toString());
- }
+ auto& collAndChunks = swCollectionAndChangedChunks.getValue();
- // After finding metadata remotely, we must have found metadata locally.
- invariant(!collAndChunks.changedChunks.empty());
+ if (collAndChunks.changedChunks.back().getVersion().epoch() != collAndChunks.epoch) {
+ return Status{ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Invalid chunks found when reloading '" << nss.toString()
+ << "' Previous collection epoch was '" << collAndChunks.epoch.toString()
+ << "', but found a new epoch '"
+ << collAndChunks.changedChunks.back().getVersion().epoch().toString()
+ << "'."};
+ }
+
+ if ((collAndChunks.epoch != maxLoaderVersion.epoch()) ||
+ (collAndChunks.changedChunks.back().getVersion() > maxLoaderVersion)) {
+ _ensureMajorityPrimaryAndScheduleCollAndChunksTask(
+ opCtx,
+ nss,
+ collAndChunkTask{swCollectionAndChangedChunks, maxLoaderVersion, termScheduled});
+ }
+ LOGV2_FOR_CATALOG_REFRESH(
+ 24108,
+ 1,
+ "Cache loader remotely refreshed for collection {namespace} from collection version "
+ "{oldCollectionVersion} and found collection version {refreshedCollectionVersion}",
+ "Cache loader remotely refreshed for collection",
+ "namespace"_attr = nss,
+ "oldCollectionVersion"_attr = maxLoaderVersion,
+ "refreshedCollectionVersion"_attr = collAndChunks.changedChunks.back().getVersion());
+
+ // Metadata was found remotely
+ // -- otherwise we would have received NamespaceNotFound rather than Status::OK().
+ // Return metadata for CatalogCache that's GTE catalogCacheSinceVersion,
+ // from the loader's persisted and enqueued metadata.
+
+ swCollectionAndChangedChunks =
+ _getLoaderMetadata(opCtx, nss, catalogCacheSinceVersion, termScheduled);
+ if (!swCollectionAndChangedChunks.isOK()) {
return swCollectionAndChangedChunks;
- };
+ }
- // Refresh the loader's metadata from the config server. The caller's request will
- // then be serviced from the loader's up-to-date metadata.
- _configServerLoader->getChunksSince(
- nss,
- maxLoaderVersion,
- [this, remoteRefreshFn, callbackFn, notify](auto opCtx, auto swCollectionAndChangedChunks) {
- // Complete the callbackFn work.
- callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swCollectionAndChangedChunks)));
- notify->set();
- });
-}
+ const auto termAfterRefresh = [&] {
+ stdx::lock_guard<Latch> lock(_mutex);
+ return _term;
+ }();
-void ShardServerCatalogCacheLoader::_runSecondaryGetDatabase(
- OperationContext* opCtx,
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
+ if (termAfterRefresh != termScheduled) {
+ // Raising a ConflictingOperationInProgress error here will cause the CatalogCache
+ // to attempt the refresh as secondary instead of failing the operation
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Replication stepdown occurred during refresh for '"
+ << nss.toString());
+ }
- forcePrimaryDatabaseRefreshAndWaitForReplication(opCtx, dbName);
+ // After finding metadata remotely, we must have found metadata locally.
+ invariant(!collAndChunks.changedChunks.empty());
- // Read the local metadata.
- auto swDatabaseType = getPersistedDbMetadata(opCtx, dbName);
- callbackFn(opCtx, std::move(swDatabaseType));
-}
+ return swCollectionAndChangedChunks;
+};
-void ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
- OperationContext* opCtx,
- StringData dbName,
- long long termScheduled,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- auto remoteRefreshFn = [this, name = dbName.toString(), termScheduled](
- OperationContext* opCtx, StatusWith<DatabaseType> swDatabaseType) {
- if (swDatabaseType == ErrorCodes::NamespaceNotFound) {
- _ensureMajorityPrimaryAndScheduleDbTask(
- opCtx, name, DBTask{swDatabaseType, termScheduled});
-
- LOGV2_FOR_CATALOG_REFRESH(
- 24109,
- 1,
- "Cache loader remotely refreshed for database {db} and found the database has "
- "been dropped",
- "Cache loader remotely refreshed for database and found the database has been "
- "dropped",
- "db"_attr = name);
- return swDatabaseType;
- }
- if (!swDatabaseType.isOK()) {
- return swDatabaseType;
- }
+StatusWith<DatabaseType> ShardServerCatalogCacheLoader::_runSecondaryGetDatabase(
+ OperationContext* opCtx, StringData dbName) {
+ forcePrimaryDatabaseRefreshAndWaitForReplication(opCtx, dbName);
+ return getPersistedDbMetadata(opCtx, dbName);
+}
- _ensureMajorityPrimaryAndScheduleDbTask(opCtx, name, DBTask{swDatabaseType, termScheduled});
+StatusWith<DatabaseType> ShardServerCatalogCacheLoader::_schedulePrimaryGetDatabase(
+ OperationContext* opCtx, StringData dbName, long long termScheduled) {
+ auto swDatabaseType = _configServerLoader->getDatabase(dbName).getNoThrow();
+ if (swDatabaseType == ErrorCodes::NamespaceNotFound) {
+ _ensureMajorityPrimaryAndScheduleDbTask(
+ opCtx, dbName, DBTask{swDatabaseType, termScheduled});
- LOGV2_FOR_CATALOG_REFRESH(24110,
- 1,
- "Cache loader remotely refreshed for database {db} and found "
- "{refreshedDatabaseType}",
- "Cache loader remotely refreshed for database",
- "db"_attr = name,
- "refreshedDatabaseType"_attr =
- swDatabaseType.getValue().toBSON());
+ LOGV2_FOR_CATALOG_REFRESH(
+ 24109,
+ 1,
+ "Cache loader remotely refreshed for database {db} "
+ "and found the database has been dropped",
+ "Cache loader remotely refreshed for database and found the database has been dropped",
+ "db"_attr = dbName);
+ return swDatabaseType;
+ }
+ if (!swDatabaseType.isOK()) {
return swDatabaseType;
- };
+ }
+
+ _ensureMajorityPrimaryAndScheduleDbTask(opCtx, dbName, DBTask{swDatabaseType, termScheduled});
+
+ LOGV2_FOR_CATALOG_REFRESH(24110,
+ 1,
+ "Cache loader remotely refreshed for database {db} "
+ "and found {refreshedDatabaseType}",
+ "Cache loader remotely refreshed for database",
+ "db"_attr = dbName,
+ "refreshedDatabaseType"_attr = swDatabaseType.getValue().toBSON());
- _configServerLoader->getDatabase(
- dbName, [this, remoteRefreshFn, callbackFn](auto opCtx, auto swDatabaseType) {
- callbackFn(opCtx, remoteRefreshFn(opCtx, std::move(swDatabaseType)));
- });
+ return swDatabaseType;
}
StatusWith<CollectionAndChangedChunks> ShardServerCatalogCacheLoader::_getLoaderMetadata(
@@ -952,7 +888,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleCollAndChun
return;
}
- _threadPool.schedule([this, nss](auto status) {
+ _executor->schedule([this, nss](auto status) {
invariant(status);
_runCollAndChunksTasks(nss);
@@ -974,7 +910,7 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper
return;
}
- _threadPool.schedule([this, name = dbName.toString()](auto status) {
+ _executor->schedule([this, name = dbName.toString()](auto status) {
invariant(status);
_runDbTasks(name);
@@ -982,7 +918,9 @@ void ShardServerCatalogCacheLoader::_ensureMajorityPrimaryAndScheduleDbTask(Oper
}
void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString& nss) {
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
+ ThreadClient tc("ShardServerCatalogCacheLoader::runCollAndChunksTasks",
+ getGlobalServiceContext());
+ auto context = _contexts.makeOperationContext(*tc);
bool taskFinished = false;
bool inShutdown = false;
@@ -1027,7 +965,7 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString
}
}
- _threadPool.schedule([this, nss](auto status) {
+ _executor->schedule([this, nss](auto status) {
if (ErrorCodes::isCancelationError(status.code())) {
LOGV2(22096,
"Cache loader failed to schedule a persisted metadata update task for namespace "
@@ -1052,7 +990,8 @@ void ShardServerCatalogCacheLoader::_runCollAndChunksTasks(const NamespaceString
}
void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
- auto context = _contexts.makeOperationContext(*Client::getCurrent());
+ ThreadClient tc("ShardServerCatalogCacheLoader::runDbTasks", getGlobalServiceContext());
+ auto context = _contexts.makeOperationContext(*tc);
bool taskFinished = false;
bool inShutdown = false;
@@ -1097,7 +1036,7 @@ void ShardServerCatalogCacheLoader::_runDbTasks(StringData dbName) {
}
}
- _threadPool.schedule([this, name = dbName.toString()](auto status) {
+ _executor->schedule([this, name = dbName.toString()](auto status) {
if (ErrorCodes::isCancelationError(status.code())) {
LOGV2(22099,
"Cache loader failed to schedule a persisted metadata update task for namespace "
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader.h b/src/mongo/db/s/shard_server_catalog_cache_loader.h
index 2adfe15d9b7..ee9b211b777 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader.h
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader.h
@@ -50,7 +50,8 @@ class ShardServerCatalogCacheLoader : public CatalogCacheLoader {
ShardServerCatalogCacheLoader& operator=(const ShardServerCatalogCacheLoader&) = delete;
public:
- ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configServerLoader);
+ ShardServerCatalogCacheLoader(std::unique_ptr<CatalogCacheLoader> configServerLoader,
+ std::shared_ptr<ThreadPool> executor);
~ShardServerCatalogCacheLoader();
/**
@@ -77,14 +78,10 @@ public:
*/
void notifyOfCollectionVersionUpdate(const NamespaceString& nss) override;
- std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- GetChunksSinceCallbackFn callbackFn) override;
+ SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss,
+ ChunkVersion version) override;
- void getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override;
+ SemiFuture<DatabaseType> getDatabase(StringData dbName) override;
void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
@@ -337,61 +334,51 @@ private:
/**
* Forces the primary to refresh its metadata for 'nss' and waits until this node's metadata
* has caught up to the primary's.
- * Then retrieves chunk metadata from this node's persisted metadata store and passes it to
- * 'callbackFn'.
+ *
+ * Returns chunk metadata from this node's persisted metadata store.
*/
- void _runSecondaryGetChunksSince(
+ StatusWith<CollectionAndChangedChunks> _runSecondaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
- const ChunkVersion& catalogCacheSinceVersion,
- std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
- std::shared_ptr<Notification<void>> notify);
+ const ChunkVersion& catalogCacheSinceVersion);
/**
* Refreshes chunk metadata from the config server's metadata store, and schedules maintenance
* of the shard's persisted metadata store with the latest updates retrieved from the config
* server.
*
- * Then calls 'callbackFn' with metadata retrieved locally from the shard persisted metadata
- * store and any in-memory tasks with terms matching 'currentTerm' enqueued to update that
- * store, GTE to 'catalogCacheSinceVersion'.
+ * Returns the metadata retrieved locally from the shard persisted metadata
+ * store and any in-memory enqueued tasks to update that store that match the given term,
+ * grather then or equal to the given chunk version.
*
* Only run on the shard primary.
*/
- void _schedulePrimaryGetChunksSince(
+ StatusWith<CollectionAndChangedChunks> _schedulePrimaryGetChunksSince(
OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& catalogCacheSinceVersion,
- long long currentTerm,
- std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)> callbackFn,
- std::shared_ptr<Notification<void>> notify);
+ long long termScheduled);
/**
* Forces the primary to refresh its metadata for 'dbName' and waits until this node's metadata
* has caught up to the primary's.
- * Then retrieves the db version from this node's persisted metadata store and passes it to
- * 'callbackFn'.
+ * Returns the database version from this node's persisted metadata store.
*/
- void _runSecondaryGetDatabase(
- OperationContext* opCtx,
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn);
+ StatusWith<DatabaseType> _runSecondaryGetDatabase(OperationContext* opCtx, StringData dbName);
/**
* Refreshes db version from the config server's metadata store, and schedules maintenance
* of the shard's persisted metadata store with the latest updates retrieved from the config
* server.
*
- * Then calls 'callbackFn' with metadata retrieved locally from the shard persisted metadata
- * to update that store.
+ * Returns the metadata retrieved locally from the shard persisted metadata to update that
+ * store.
*
* Only run on the shard primary.
*/
- void _schedulePrimaryGetDatabase(
- OperationContext* opCtx,
- StringData dbName,
- long long termScheduled,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn);
+ StatusWith<DatabaseType> _schedulePrimaryGetDatabase(OperationContext* opCtx,
+ StringData dbName,
+ long long termScheduled);
/**
* Loads chunk metadata from the shard persisted metadata store and any in-memory tasks with
@@ -478,7 +465,7 @@ private:
std::unique_ptr<CatalogCacheLoader> _configServerLoader;
// Thread pool used to run blocking tasks which perform disk reads and writes
- ThreadPool _threadPool;
+ std::shared_ptr<ThreadPool> _executor;
// Registry of notifications for changes happening to the shard's on-disk routing information
NamespaceMetadataChangeNotifications _namespaceNotifications;
diff --git a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
index 5cfc7ce4124..445f9320b3d 100644
--- a/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
+++ b/src/mongo/db/s/shard_server_catalog_cache_loader_test.cpp
@@ -78,10 +78,6 @@ public:
vector<ChunkType> setUpChunkLoaderWithFiveChunks();
const KeyPattern kKeyPattern = KeyPattern(BSON(kPattern << 1));
- const std::function<void(OperationContext*, StatusWith<CollectionAndChangedChunks>)>
- kDoNothingCallbackFn = [](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {};
CatalogCacheLoaderMock* _remoteLoaderMock;
std::unique_ptr<ShardServerCatalogCacheLoader> _shardLoader;
@@ -96,9 +92,11 @@ void ShardServerCatalogCacheLoaderTest::setUp() {
// Create mock remote and real shard loader, retaining a pointer to the mock remote loader so
// that unit tests can manipulate it to return certain responses.
- std::unique_ptr<CatalogCacheLoaderMock> mockLoader = std::make_unique<CatalogCacheLoaderMock>();
+ _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
+ auto mockLoader = std::make_unique<CatalogCacheLoaderMock>(catalogCacheExecutor());
_remoteLoaderMock = mockLoader.get();
- _shardLoader = std::make_unique<ShardServerCatalogCacheLoader>(std::move(mockLoader));
+ _shardLoader = std::make_unique<ShardServerCatalogCacheLoader>(std::move(mockLoader),
+ catalogCacheExecutor());
// Set the shard loader to primary mode, and set it for testing.
_shardLoader->initializeReplicaSetRole(true);
@@ -207,25 +205,12 @@ vector<ChunkType> ShardServerCatalogCacheLoaderTest::setUpChunkLoaderWithFiveChu
_remoteLoaderMock->setCollectionRefreshReturnValue(collectionType);
_remoteLoaderMock->setChunkRefreshReturnValue(chunks);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
- notification->get();
-
- // Check refreshCallbackFn thread results where we can safely throw.
- ASSERT_OK(results.getStatus());
- auto collAndChunkRes = results.getValue();
- ASSERT_EQUALS(collAndChunkRes.epoch, collectionType.getEpoch());
- ASSERT_EQUALS(collAndChunkRes.changedChunks.size(), 5UL);
- for (unsigned int i = 0; i < collAndChunkRes.changedChunks.size(); ++i) {
- ASSERT_BSONOBJ_EQ(collAndChunkRes.changedChunks[i].toShardBSON(), chunks[i].toShardBSON());
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get();
+
+ ASSERT_EQUALS(collAndChunksRes.epoch, collectionType.getEpoch());
+ ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
+ for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks[i].toShardBSON(), chunks[i].toShardBSON());
}
return chunks;
@@ -237,19 +222,10 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromUnshardedToUnsharded) {
Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found");
_remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
- notification->get();
-
- ASSERT_EQUALS(results.getStatus(), errorStatus);
+ ASSERT_THROWS_CODE_AND_WHAT(_shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get(),
+ DBException,
+ errorStatus.code(),
+ errorStatus.reason());
}
TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedToUnsharded) {
@@ -263,19 +239,11 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedToUnsharded) {
Status errorStatus = Status(ErrorCodes::NamespaceNotFound, "collection not found");
_remoteLoaderMock->setCollectionRefreshReturnValue(errorStatus);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto nextRefreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), nextRefreshCallbackFn);
- notification->get();
-
- ASSERT_EQUALS(results.getStatus(), errorStatus);
+ ASSERT_THROWS_CODE_AND_WHAT(
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(),
+ DBException,
+ errorStatus.code(),
+ errorStatus.reason());
}
TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiff) {
@@ -290,22 +258,10 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiff) {
lastChunk.push_back(chunks.back());
_remoteLoaderMock->setChunkRefreshReturnValue(lastChunk);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn);
- notification->get();
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get();
// Check that refreshing from the latest version returned a single document matching that
// version.
- ASSERT_OK(results.getStatus());
- auto collAndChunksRes = results.getValue();
ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch());
ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 1UL);
ASSERT_BSONOBJ_EQ(collAndChunksRes.changedChunks.back().toShardBSON(),
@@ -326,21 +282,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNoDiffReq
lastChunk.push_back(chunks.back());
_remoteLoaderMock->setChunkRefreshReturnValue(lastChunk);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto completeRefreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), completeRefreshCallbackFn);
- notification->get();
-
- // Check that the complete routing table was returned successfully.
- ASSERT_OK(results.getStatus());
- auto collAndChunksRes = results.getValue();
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get();
ASSERT_EQUALS(collAndChunksRes.epoch, chunks.back().getVersion().epoch());
ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
@@ -361,21 +303,9 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiff) {
vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(collVersion);
_remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn);
- notification->get();
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get();
// Check that the diff was returned successfull.
- ASSERT_OK(results.getStatus());
- auto collAndChunksRes = results.getValue();
ASSERT_EQUALS(collAndChunksRes.epoch, updatedChunksDiff.front().getVersion().epoch());
ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 4UL);
for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
@@ -400,9 +330,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffReque
vector<ChunkType> updatedChunksDiff = makeThreeUpdatedChunksDiff(chunks.back().getVersion());
_remoteLoaderMock->setChunkRefreshReturnValue(updatedChunksDiff);
- auto notification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), kDoNothingCallbackFn);
- notification->get();
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get();
// Wait for persistence of update
_shardLoader->waitForCollectionFlush(operationContext(), kNss);
@@ -416,21 +344,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindDiffReque
vector<ChunkType> completeRoutingTableWithDiffApplied =
makeCombinedOriginalFiveChunksAndThreeNewChunksDiff(chunks, updatedChunksDiff);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto nextNotification =
- _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
- nextNotification->get();
-
- // Check that the complete routing table, with diff applied, was returned.
- ASSERT_OK(results.getStatus());
- auto collAndChunksRes = results.getValue();
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get();
ASSERT_EQUALS(collAndChunksRes.epoch,
completeRoutingTableWithDiffApplied.front().getVersion().epoch());
ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
@@ -457,21 +371,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindNewEpoch)
_remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch);
_remoteLoaderMock->setChunkRefreshReturnValue(chunksWithNewEpoch);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), refreshCallbackFn);
- notification->get();
-
- // Check that the complete routing table for the new epoch was returned.
- ASSERT_OK(results.getStatus());
- auto collAndChunksRes = results.getValue();
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get();
ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch());
ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
@@ -502,19 +402,9 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun
mixedChunks.insert(mixedChunks.end(), chunksWithNewEpoch.begin(), chunksWithNewEpoch.end());
_remoteLoaderMock->setChunkRefreshReturnValue(mixedChunks);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> mixedResults{
- Status(ErrorCodes::InternalError, "")};
- const auto mixedRefreshCallbackFn = [&mixedResults](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- mixedResults = std::move(swCollAndChunks);
- };
-
- auto mixedNotification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), mixedRefreshCallbackFn);
- mixedNotification->get();
-
- ASSERT_EQUALS(mixedResults.getStatus().code(), ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(_shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get(),
+ DBException,
+ ErrorCodes::ConflictingOperationInProgress);
// Now make sure the newly recreated collection is cleanly loaded. We cannot ensure a
// non-variable response until the loader has remotely retrieved the new metadata and applied
@@ -524,9 +414,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun
_remoteLoaderMock->setCollectionRefreshReturnValue(collectionTypeWithNewEpoch);
_remoteLoaderMock->setChunkRefreshReturnValue(chunksWithNewEpoch);
- auto cleanNotification =
- _shardLoader->getChunksSince(kNss, chunks.back().getVersion(), kDoNothingCallbackFn);
- cleanNotification->get();
+ _shardLoader->getChunksSince(kNss, chunks.back().getVersion()).get();
// Wait for persistence of update.
_shardLoader->waitForCollectionFlush(operationContext(), kNss);
@@ -535,20 +423,7 @@ TEST_F(ShardServerCatalogCacheLoaderTest, PrimaryLoadFromShardedAndFindMixedChun
lastChunkWithNewEpoch.push_back(chunksWithNewEpoch.back());
_remoteLoaderMock->setChunkRefreshReturnValue(lastChunkWithNewEpoch);
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> results{
- Status(ErrorCodes::InternalError, "")};
- const auto refreshCallbackFn = [&results](
- OperationContext * opCtx,
- StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
- results = std::move(swCollAndChunks);
- };
-
- auto notification =
- _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED(), refreshCallbackFn);
- notification->get();
-
- ASSERT_OK(results.getStatus());
- auto collAndChunksRes = results.getValue();
+ auto collAndChunksRes = _shardLoader->getChunksSince(kNss, ChunkVersion::UNSHARDED()).get();
ASSERT_EQUALS(collAndChunksRes.epoch, collectionTypeWithNewEpoch.getEpoch());
ASSERT_EQUALS(collAndChunksRes.changedChunks.size(), 5UL);
for (unsigned int i = 0; i < collAndChunksRes.changedChunks.size(); ++i) {
diff --git a/src/mongo/db/s/shard_server_test_fixture.cpp b/src/mongo/db/s/shard_server_test_fixture.cpp
index b916c674792..42c1935badc 100644
--- a/src/mongo/db/s/shard_server_test_fixture.cpp
+++ b/src/mongo/db/s/shard_server_test_fixture.cpp
@@ -65,9 +65,12 @@ void ShardServerTestFixture::setUp() {
_clusterId = OID::gen();
ShardingState::get(getServiceContext())->setInitialized(_myShardName, _clusterId);
- CatalogCacheLoader::set(getServiceContext(),
- std::make_unique<ShardServerCatalogCacheLoader>(
- std::make_unique<ConfigServerCatalogCacheLoader>()));
+ _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
+ CatalogCacheLoader::set(
+ getServiceContext(),
+ std::make_unique<ShardServerCatalogCacheLoader>(
+ std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor()),
+ catalogCacheExecutor()));
uassertStatusOK(
initializeGlobalShardingStateForMongodForTest(ConnectionString(kConfigHostAndPort)));
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index 0ab2d96fab7..f8a6c671e5c 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -535,17 +535,21 @@ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
auto const service = opCtx->getServiceContext();
-
+ auto catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
if (storageGlobalParams.readOnly) {
- CatalogCacheLoader::set(service, std::make_unique<ReadOnlyCatalogCacheLoader>());
+ CatalogCacheLoader::set(
+ service, std::make_unique<ReadOnlyCatalogCacheLoader>(catalogCacheExecutor));
} else {
- CatalogCacheLoader::set(service,
- std::make_unique<ShardServerCatalogCacheLoader>(
- std::make_unique<ConfigServerCatalogCacheLoader>()));
+ CatalogCacheLoader::set(
+ service,
+ std::make_unique<ShardServerCatalogCacheLoader>(
+ std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor),
+ catalogCacheExecutor));
}
} else {
- CatalogCacheLoader::set(service, std::make_unique<ConfigServerCatalogCacheLoader>());
+ CatalogCacheLoader::set(
+ service, std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor));
}
auto validator = LogicalTimeValidator::get(service);
@@ -555,7 +559,8 @@ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
globalConnPool.addHook(new ShardingConnectionHook(makeEgressHooksList(service)));
- auto catalogCache = std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx));
+ auto catalogCache =
+ std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx), catalogCacheExecutor);
// List of hooks which will be called by the ShardRegistry when it discovers a shard has been
// removed.
diff --git a/src/mongo/db/s/sharding_initialization_mongod_test.cpp b/src/mongo/db/s/sharding_initialization_mongod_test.cpp
index a724a245f84..30158e7c22a 100644
--- a/src/mongo/db/s/sharding_initialization_mongod_test.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod_test.cpp
@@ -70,9 +70,12 @@ protected:
// When sharding initialization is triggered, initialize sharding state as a shard server.
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
- CatalogCacheLoader::set(getServiceContext(),
- std::make_unique<ShardServerCatalogCacheLoader>(
- std::make_unique<ConfigServerCatalogCacheLoader>()));
+ _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
+ CatalogCacheLoader::set(
+ getServiceContext(),
+ std::make_unique<ShardServerCatalogCacheLoader>(
+ std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor()),
+ catalogCacheExecutor()));
ShardingInitializationMongoD::get(getServiceContext())
->setGlobalInitMethodForTest([&](OperationContext* opCtx,
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
index 103ecc582d1..911c4c1da07 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
@@ -256,7 +256,8 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest(
auto const grid = Grid::get(operationContext());
grid->init(makeShardingCatalogClient(std::move(distLockManagerPtr)),
- std::make_unique<CatalogCache>(CatalogCacheLoader::get(getServiceContext())),
+ std::make_unique<CatalogCache>(CatalogCacheLoader::get(getServiceContext()),
+ catalogCacheExecutor()),
makeShardRegistry(configConnStr),
makeClusterCursorManager(),
makeBalancerConfiguration(),
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index dd5239b776b..34760afb2df 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -249,6 +249,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/logical_time_metadata_hook',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
'$BUILD_DIR/mongo/executor/task_executor_pool',
'client/shard_interface',
'query/cluster_cursor_manager',
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 2eb6c3371eb..2fc6de7cb30 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -135,7 +135,19 @@ std::shared_ptr<RoutingTableHistory> refreshCollectionRoutingInfo(
} // namespace
-CatalogCache::CatalogCache(CatalogCacheLoader& cacheLoader) : _cacheLoader(cacheLoader) {}
+std::shared_ptr<ThreadPool> CatalogCache::makeDefaultThreadPool() {
+ ThreadPool::Options options;
+ options.poolName = "CatalogCache";
+ options.minThreads = 0;
+ options.maxThreads = 6;
+
+ auto executor = std::make_shared<ThreadPool>(std::move(options));
+ executor->startup();
+ return executor;
+}
+
+CatalogCache::CatalogCache(CatalogCacheLoader& cacheLoader, std::shared_ptr<ThreadPool> executor)
+ : _cacheLoader(cacheLoader), _executor(executor){};
CatalogCache::~CatalogCache() = default;
@@ -159,7 +171,7 @@ StatusWith<CachedDatabaseInfo> CatalogCache::getDatabase(OperationContext* opCtx
if (!refreshNotification) {
refreshNotification = (dbEntry->refreshCompletionNotification =
std::make_shared<Notification<Status>>());
- _scheduleDatabaseRefresh(ul, dbName.toString(), dbEntry);
+ _scheduleDatabaseRefresh(ul, dbName, dbEntry);
}
// Wait on the notification outside of the mutex.
@@ -236,7 +248,7 @@ CatalogCache::RefreshResult CatalogCache::_getCollectionRoutingInfoAt(
if (!refreshNotification) {
refreshNotification = (collEntry->refreshCompletionNotification =
std::make_shared<Notification<Status>>());
- _scheduleCollectionRefresh(ul, collEntry, nss, 1);
+ _scheduleCollectionRefresh(ul, opCtx->getServiceContext(), collEntry, nss, 1);
refreshActionTaken = RefreshAction::kPerformedRefresh;
}
@@ -584,76 +596,10 @@ void CatalogCache::checkAndRecordOperationBlockedByRefresh(OperationContext* opC
}
void CatalogCache::_scheduleDatabaseRefresh(WithLock lk,
- const std::string& dbName,
+ StringData dbName,
std::shared_ptr<DatabaseInfoEntry> dbEntry) {
- const auto onRefreshCompleted = [this, t = Timer(), dbName, dbEntry](
- const StatusWith<DatabaseType>& swDbt) {
- // TODO (SERVER-34164): Track and increment stats for database refreshes.
- if (!swDbt.isOK()) {
- LOGV2_OPTIONS(24100,
- {logv2::LogComponent::kShardingCatalogRefresh},
- "Error refreshing cached database entry for {db}. Took {duration} and "
- "failed due to {error}",
- "Error refreshing cached database entry",
- "db"_attr = dbName,
- "duration"_attr = Milliseconds(t.millis()),
- "error"_attr = redact(swDbt.getStatus()));
- return;
- }
-
- const auto dbVersionAfterRefresh = swDbt.getValue().getVersion();
- const int logLevel =
- (!dbEntry->dbt ||
- (dbEntry->dbt &&
- !databaseVersion::equal(dbVersionAfterRefresh, dbEntry->dbt->getVersion())))
- ? 0
- : 1;
- LOGV2_FOR_CATALOG_REFRESH(
- 24101,
- logLevel,
- "Refreshed cached database entry for {db} to version {newDbVersion} from version "
- "{oldDbVersion}. Took {duration}",
- "Refreshed cached database entry",
- "db"_attr = dbName,
- "newDbVersion"_attr = dbVersionAfterRefresh.toBSON(),
- "oldDbVersion"_attr = (dbEntry->dbt ? dbEntry->dbt->getVersion().toBSON() : BSONObj()),
- "duration"_attr = Milliseconds(t.millis()));
- };
-
- // Invoked if getDatabase resulted in error or threw and exception
- const auto onRefreshFailed =
- [ this, dbName, dbEntry, onRefreshCompleted ](WithLock, const Status& status) noexcept {
- onRefreshCompleted(status);
- // Clear the notification so the next 'getDatabase' kicks off a new refresh attempt.
- dbEntry->refreshCompletionNotification->set(status);
- dbEntry->refreshCompletionNotification = nullptr;
-
- if (status == ErrorCodes::NamespaceNotFound) {
- // The refresh found that the database was dropped, so remove its entry from the cache.
- _databases.erase(dbName);
- _collectionsByDb.erase(dbName);
- return;
- }
- };
-
- const auto refreshCallback = [ this, dbName, dbEntry, onRefreshFailed, onRefreshCompleted ](
- OperationContext * opCtx, StatusWith<DatabaseType> swDbt) noexcept {
- stdx::lock_guard<Latch> lg(_mutex);
-
- if (!swDbt.isOK()) {
- onRefreshFailed(lg, swDbt.getStatus());
- return;
- }
-
- onRefreshCompleted(swDbt);
-
- dbEntry->needsRefresh = false;
- dbEntry->refreshCompletionNotification->set(Status::OK());
- dbEntry->refreshCompletionNotification = nullptr;
-
- dbEntry->dbt = std::move(swDbt.getValue());
- };
+ // TODO (SERVER-34164): Track and increment stats for database refreshes
LOGV2_FOR_CATALOG_REFRESH(24102,
1,
@@ -664,16 +610,65 @@ void CatalogCache::_scheduleDatabaseRefresh(WithLock lk,
"currentDbInfo"_attr =
(dbEntry->dbt ? dbEntry->dbt->toBSON() : BSONObj()));
- try {
- _cacheLoader.getDatabase(dbName, refreshCallback);
- } catch (const DBException& ex) {
- const auto status = ex.toStatus();
+ Timer t{};
- onRefreshFailed(lk, status);
- }
+ _cacheLoader.getDatabase(dbName)
+ .thenRunOn(_executor)
+ .then([=](const DatabaseType& dbt) noexcept {
+ const auto dbVersionAfterRefresh = dbt.getVersion();
+ const auto dbVersionHasChanged =
+ (!dbEntry->dbt ||
+ (dbEntry->dbt &&
+ !databaseVersion::equal(dbVersionAfterRefresh, dbEntry->dbt->getVersion())));
+
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ LOGV2_FOR_CATALOG_REFRESH(
+ 24101,
+ dbVersionHasChanged ? 0 : 1,
+ "Refreshed cached database entry for {db} to version {newDbVersion}"
+ "from version {oldDbVersion}. Took {duration}",
+ "Refreshed cached database entry",
+ "db"_attr = dbName,
+ "newDbVersion"_attr = dbVersionAfterRefresh,
+ "oldDbVersion"_attr =
+ (dbEntry->dbt ? dbEntry->dbt->getVersion().toBSON() : BSONObj()),
+ "duration"_attr = Milliseconds(t.millis()));
+
+ dbEntry->needsRefresh = false;
+ dbEntry->refreshCompletionNotification->set(Status::OK());
+ dbEntry->refreshCompletionNotification = nullptr;
+
+ dbEntry->dbt = std::move(dbt);
+ })
+ .onError([=](Status errStatus) noexcept {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ LOGV2_OPTIONS(24100,
+ {logv2::LogComponent::kShardingCatalogRefresh},
+ "Error refreshing cached database entry for {db}. Took {duration} and "
+ "failed due to {error}",
+ "Error refreshing cached database entry",
+ "db"_attr = dbName,
+ "duration"_attr = Milliseconds(t.millis()),
+ "error"_attr = redact(errStatus));
+
+ // Clear the notification so the next 'getDatabase' kicks off a new refresh attempt.
+ dbEntry->refreshCompletionNotification->set(errStatus);
+ dbEntry->refreshCompletionNotification = nullptr;
+
+ if (errStatus == ErrorCodes::NamespaceNotFound) {
+ // The refresh found that the database was dropped, so remove its entry
+ // from the cache.
+ _databases.erase(dbName);
+ _collectionsByDb.erase(dbName);
+ }
+ })
+ .getAsync([](auto) {});
}
void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
+ ServiceContext* service,
std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
NamespaceString const& nss,
int refreshAttempt) {
@@ -744,15 +739,16 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
};
// Invoked if getChunksSince resulted in error or threw an exception
- const auto onRefreshFailed = [ this, collEntry, nss, refreshAttempt, onRefreshCompleted ](
- WithLock lk, const Status& status) noexcept {
+ const auto onRefreshFailed =
+ [ this, service, collEntry, nss, refreshAttempt,
+ onRefreshCompleted ](WithLock lk, const Status& status) noexcept {
onRefreshCompleted(status, nullptr);
// It is possible that the metadata is being changed concurrently, so retry the
// refresh again
if (status == ErrorCodes::ConflictingOperationInProgress &&
refreshAttempt < kMaxInconsistentRoutingInfoRefreshAttempts) {
- _scheduleCollectionRefresh(lk, collEntry, nss, refreshAttempt + 1);
+ _scheduleCollectionRefresh(lk, service, collEntry, nss, refreshAttempt + 1);
} else {
// Leave needsRefresh to true so that any subsequent get attempts will kick off
// another round of refresh
@@ -762,13 +758,16 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
};
const auto refreshCallback =
- [ this, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
- OperationContext * opCtx,
+ [ this, service, collEntry, nss, existingRoutingInfo, onRefreshFailed, onRefreshCompleted ](
StatusWith<CatalogCacheLoader::CollectionAndChangedChunks> swCollAndChunks) noexcept {
+
+ ThreadClient tc("CatalogCache::collectionRefresh", service);
+ auto opCtx = tc->makeOperationContext();
+
std::shared_ptr<RoutingTableHistory> newRoutingInfo;
try {
newRoutingInfo = refreshCollectionRoutingInfo(
- opCtx, nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
+ opCtx.get(), nss, std::move(existingRoutingInfo), std::move(swCollAndChunks));
onRefreshCompleted(Status::OK(), newRoutingInfo.get());
} catch (const DBException& ex) {
@@ -784,7 +783,7 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
collEntry->refreshCompletionNotification->set(Status::OK());
collEntry->refreshCompletionNotification = nullptr;
- setOperationShouldBlockBehindCatalogCacheRefresh(opCtx, false);
+ setOperationShouldBlockBehindCatalogCacheRefresh(opCtx.get(), false);
if (existingRoutingInfo && newRoutingInfo &&
existingRoutingInfo->getSequenceNumber() == newRoutingInfo->getSequenceNumber()) {
@@ -805,18 +804,9 @@ void CatalogCache::_scheduleCollectionRefresh(WithLock lk,
"namespace"_attr = nss,
"currentCollectionVersion"_attr = startingCollectionVersion);
- try {
- _cacheLoader.getChunksSince(nss, startingCollectionVersion, refreshCallback);
- } catch (const DBException& ex) {
- const auto status = ex.toStatus();
-
- // ConflictingOperationInProgress errors trigger retry of the catalog cache reload logic. If
- // we failed to schedule the asynchronous reload, there is no point in doing another
- // attempt.
- invariant(status != ErrorCodes::ConflictingOperationInProgress);
-
- onRefreshFailed(lk, status);
- }
+ _cacheLoader.getChunksSince(nss, startingCollectionVersion)
+ .thenRunOn(_executor)
+ .getAsync(refreshCallback);
// The routing info for this collection shouldn't change, as other threads may try to use the
// CatalogCache while we are waiting for the refresh to complete.
diff --git a/src/mongo/s/catalog_cache.h b/src/mongo/s/catalog_cache.h
index 0ce48cd7ffa..0d8b639c68d 100644
--- a/src/mongo/s/catalog_cache.h
+++ b/src/mongo/s/catalog_cache.h
@@ -40,6 +40,7 @@
#include "mongo/s/client/shard.h"
#include "mongo/s/database_version_gen.h"
#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/string_map.h"
@@ -128,7 +129,7 @@ class CatalogCache {
CatalogCache& operator=(const CatalogCache&) = delete;
public:
- CatalogCache(CatalogCacheLoader& cacheLoader);
+ CatalogCache(CatalogCacheLoader& cacheLoader, std::shared_ptr<ThreadPool> executor);
~CatalogCache();
/**
@@ -302,6 +303,13 @@ public:
*/
void checkAndRecordOperationBlockedByRefresh(OperationContext* opCtx, mongo::LogicalOp opType);
+ /**
+ * Returns a ThreadPool with default options to be used for CatalogCache.
+ *
+ * The returned ThreadPool is already started up.
+ */
+ static std::shared_ptr<ThreadPool> makeDefaultThreadPool();
+
private:
// Make the cache entries friends so they can access the private classes below
friend class CachedDatabaseInfo;
@@ -353,7 +361,7 @@ private:
* database entry must be in the 'needsRefresh' state.
*/
void _scheduleDatabaseRefresh(WithLock,
- const std::string& dbName,
+ StringData dbName,
std::shared_ptr<DatabaseInfoEntry> dbEntry);
/**
@@ -361,6 +369,7 @@ private:
* namespace must be in the 'needRefresh' state.
*/
void _scheduleCollectionRefresh(WithLock,
+ ServiceContext* service,
std::shared_ptr<CollectionRoutingInfoEntry> collEntry,
NamespaceString const& nss,
int refreshAttempt);
@@ -487,6 +496,8 @@ private:
} _stats;
+ std::shared_ptr<ThreadPool> _executor;
+
using DatabaseInfoMap = StringMap<std::shared_ptr<DatabaseInfoEntry>>;
using CollectionInfoMap = StringMap<std::shared_ptr<CollectionRoutingInfoEntry>>;
using CollectionsByDbMap = StringMap<CollectionInfoMap>;
diff --git a/src/mongo/s/catalog_cache_loader.h b/src/mongo/s/catalog_cache_loader.h
index 967c68f3627..e451664eda1 100644
--- a/src/mongo/s/catalog_cache_loader.h
+++ b/src/mongo/s/catalog_cache_loader.h
@@ -117,34 +117,23 @@ public:
virtual void notifyOfCollectionVersionUpdate(const NamespaceString& nss) = 0;
/**
- * Non-blocking call, which requests the chunks changed since the specified version to be
- * fetched from the persistent metadata store and invokes the callback function with the result.
- * The callback function must never throw - it is a fatal error to do so.
+ * Non-blocking call, which returns the chunks changed since the specified version to be
+ * fetched from the persistent metadata store.
*
* If for some reason the asynchronous fetch operation cannot be dispatched (for example on
- * shutdown), throws a DBException. Otherwise it is guaranteed that the callback function will
- * be invoked even on error and the returned notification will be signalled.
- *
- * The callbackFn object must not be destroyed until it has been called. The returned
- * Notification object can be waited on in order to ensure that.
+ * shutdown), throws a DBException.
*/
- virtual std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) = 0;
+ virtual SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss,
+ ChunkVersion version) = 0;
/**
- * Non-blocking call, which requests the most recent db version for the given dbName from the
- * the persistent metadata store and invokes the callback function with the result.
- * The callback function must never throw - it is a fatal error to do so.
+ * Non-blocking call, which returns the most recent db version for the given dbName from the
+ * the persistent metadata store.
*
* If for some reason the asynchronous fetch operation cannot be dispatched (for example on
- * shutdown), throws a DBException. Otherwise it is guaranteed that the callback function will
- * be invoked even on error.
- *
- * The callbackFn object must not be destroyed until it has been called.
+ * shutdown), throws a DBException.
*/
- virtual void getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) = 0;
+ virtual SemiFuture<DatabaseType> getDatabase(StringData dbName) = 0;
/**
* Waits for any pending changes for the specified collection to be persisted locally (not
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 7332baf5610..c40f11e661c 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -49,23 +49,6 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk
namespace {
/**
- * Constructs the default options for the thread pool used by the cache loader.
- */
-ThreadPool::Options makeDefaultThreadPoolOptions() {
- ThreadPool::Options options;
- options.poolName = "ConfigServerCatalogCacheLoader";
- options.minThreads = 0;
- options.maxThreads = 6;
-
- // Ensure all threads have a client
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
-
- return options;
-}
-
-/**
* Structure repsenting the generated query and sort order for a chunk diffing operation.
*/
struct QueryAndSort {
@@ -136,14 +119,8 @@ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx,
} // namespace
-ConfigServerCatalogCacheLoader::ConfigServerCatalogCacheLoader()
- : _threadPool(makeDefaultThreadPoolOptions()) {
- _threadPool.startup();
-}
-
-ConfigServerCatalogCacheLoader::~ConfigServerCatalogCacheLoader() {
- shutDown();
-}
+ConfigServerCatalogCacheLoader::ConfigServerCatalogCacheLoader(std::shared_ptr<ThreadPool> executor)
+ : _executor(executor) {}
void ConfigServerCatalogCacheLoader::initializeReplicaSetRole(bool isPrimary) {
MONGO_UNREACHABLE;
@@ -157,19 +134,7 @@ void ConfigServerCatalogCacheLoader::onStepUp() {
MONGO_UNREACHABLE;
}
-void ConfigServerCatalogCacheLoader::shutDown() {
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_inShutdown) {
- return;
- }
-
- _inShutdown = true;
- }
-
- _threadPool.shutdown();
- _threadPool.join();
-}
+void ConfigServerCatalogCacheLoader::shutDown() {}
void ConfigServerCatalogCacheLoader::notifyOfCollectionVersionUpdate(const NamespaceString& nss) {
MONGO_UNREACHABLE;
@@ -185,53 +150,34 @@ void ConfigServerCatalogCacheLoader::waitForDatabaseFlush(OperationContext* opCt
MONGO_UNREACHABLE;
}
-std::shared_ptr<Notification<void>> ConfigServerCatalogCacheLoader::getChunksSince(
- const NamespaceString& nss, ChunkVersion version, GetChunksSinceCallbackFn callbackFn) {
- auto notify = std::make_shared<Notification<void>>();
-
- _threadPool.schedule([ nss, version, notify, callbackFn ](auto status) noexcept {
- invariant(status);
-
- auto opCtx = Client::getCurrent()->makeOperationContext();
-
- auto swCollAndChunks = [&]() -> StatusWith<CollectionAndChangedChunks> {
- try {
- return getChangedChunks(opCtx.get(), nss, version);
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- }();
+SemiFuture<CollectionAndChangedChunks> ConfigServerCatalogCacheLoader::getChunksSince(
+ const NamespaceString& nss, ChunkVersion version) {
- callbackFn(opCtx.get(), std::move(swCollAndChunks));
- notify->set();
- });
+ return ExecutorFuture<void>(_executor)
+ .then([=]() {
+ ThreadClient tc("ConfigServerCatalogCacheLoader::getChunksSince",
+ getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
- return notify;
+ return getChangedChunks(opCtx.get(), nss, version);
+ })
+ .semi();
}
-void ConfigServerCatalogCacheLoader::getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) {
- _threadPool.schedule([ name = dbName.toString(), callbackFn ](auto status) noexcept {
- invariant(status);
-
- auto opCtx = Client::getCurrent()->makeOperationContext();
-
- auto swDbt = [&]() -> StatusWith<DatabaseType> {
- try {
- return uassertStatusOK(
- Grid::get(opCtx.get())
- ->catalogClient()
- ->getDatabase(
- opCtx.get(), name, repl::ReadConcernLevel::kMajorityReadConcern))
- .value;
- } catch (const DBException& ex) {
- return ex.toStatus();
- }
- }();
-
- callbackFn(opCtx.get(), std::move(swDbt));
- });
+SemiFuture<DatabaseType> ConfigServerCatalogCacheLoader::getDatabase(StringData dbName) {
+ return ExecutorFuture<void>(_executor)
+ .then([name = dbName.toString()] {
+ ThreadClient tc("ConfigServerCatalogCacheLoader::getDatabase",
+ getGlobalServiceContext());
+ auto opCtx = tc->makeOperationContext();
+ return uassertStatusOK(Grid::get(opCtx.get())
+ ->catalogClient()
+ ->getDatabase(opCtx.get(),
+ name,
+ repl::ReadConcernLevel::kMajorityReadConcern))
+ .value;
+ })
+ .semi();
}
} // namespace mongo
diff --git a/src/mongo/s/config_server_catalog_cache_loader.h b/src/mongo/s/config_server_catalog_cache_loader.h
index 2da4fb9a8e9..09311d7a2d0 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.h
+++ b/src/mongo/s/config_server_catalog_cache_loader.h
@@ -36,8 +36,8 @@ namespace mongo {
class ConfigServerCatalogCacheLoader final : public CatalogCacheLoader {
public:
- ConfigServerCatalogCacheLoader();
- ~ConfigServerCatalogCacheLoader();
+ ConfigServerCatalogCacheLoader(std::shared_ptr<ThreadPool> executor);
+ ~ConfigServerCatalogCacheLoader() = default;
/**
* These functions should never be called. They trigger invariants if called.
@@ -50,24 +50,13 @@ public:
void waitForCollectionFlush(OperationContext* opCtx, const NamespaceString& nss) override;
void waitForDatabaseFlush(OperationContext* opCtx, StringData dbName) override;
- std::shared_ptr<Notification<void>> getChunksSince(
- const NamespaceString& nss,
- ChunkVersion version,
- GetChunksSinceCallbackFn callbackFn) override;
-
- void getDatabase(
- StringData dbName,
- std::function<void(OperationContext*, StatusWith<DatabaseType>)> callbackFn) override;
+ SemiFuture<CollectionAndChangedChunks> getChunksSince(const NamespaceString& nss,
+ ChunkVersion version) override;
+ SemiFuture<DatabaseType> getDatabase(StringData dbName) override;
private:
// Thread pool to be used to perform metadata load
- ThreadPool _threadPool;
-
- // Protects the class state below
- Mutex _mutex = MONGO_MAKE_LATCH("ConfigServerCatalogCacheLoader::_mutex");
-
- // True if shutDown was called.
- bool _inShutdown{false};
+ std::shared_ptr<ThreadPool> _executor;
};
} // namespace mongo
diff --git a/src/mongo/s/mongos_main.cpp b/src/mongo/s/mongos_main.cpp
index 92ea2adc996..06c6e3e4dd1 100644
--- a/src/mongo/s/mongos_main.cpp
+++ b/src/mongo/s/mongos_main.cpp
@@ -418,10 +418,12 @@ Status initializeSharding(OperationContext* opCtx) {
auto shardFactory =
std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory));
+ auto catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
CatalogCacheLoader::set(opCtx->getServiceContext(),
- std::make_unique<ConfigServerCatalogCacheLoader>());
+ std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor));
- auto catalogCache = std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx));
+ auto catalogCache =
+ std::make_unique<CatalogCache>(CatalogCacheLoader::get(opCtx), catalogCacheExecutor);
// List of hooks which will be called by the ShardRegistry when it discovers a shard has been
// removed.
diff --git a/src/mongo/s/sharding_router_test_fixture.cpp b/src/mongo/s/sharding_router_test_fixture.cpp
index e7768f659ad..7142c90ae11 100644
--- a/src/mongo/s/sharding_router_test_fixture.cpp
+++ b/src/mongo/s/sharding_router_test_fixture.cpp
@@ -167,18 +167,21 @@ ShardingTestFixture::ShardingTestFixture()
auto shardRegistry(std::make_unique<ShardRegistry>(std::move(shardFactory), configCS));
executorPool->startup();
- CatalogCacheLoader::set(service, std::make_unique<ConfigServerCatalogCacheLoader>());
+ _catalogCacheExecutor = CatalogCache::makeDefaultThreadPool();
+ CatalogCacheLoader::set(
+ service, std::make_unique<ConfigServerCatalogCacheLoader>(catalogCacheExecutor()));
// For now initialize the global grid object. All sharding objects will be accessible from there
// until we get rid of it.
auto const grid = Grid::get(operationContext());
- grid->init(makeShardingCatalogClient(std::move(uniqueDistLockManager)),
- std::make_unique<CatalogCache>(CatalogCacheLoader::get(service)),
- std::move(shardRegistry),
- std::make_unique<ClusterCursorManager>(service->getPreciseClockSource()),
- std::make_unique<BalancerConfiguration>(),
- std::move(executorPool),
- _mockNetwork);
+ grid->init(
+ makeShardingCatalogClient(std::move(uniqueDistLockManager)),
+ std::make_unique<CatalogCache>(CatalogCacheLoader::get(service), catalogCacheExecutor()),
+ std::move(shardRegistry),
+ std::make_unique<ClusterCursorManager>(service->getPreciseClockSource()),
+ std::make_unique<BalancerConfiguration>(),
+ std::move(executorPool),
+ _mockNetwork);
if (grid->catalogClient()) {
grid->catalogClient()->startup();
diff --git a/src/mongo/s/sharding_test_fixture_common.h b/src/mongo/s/sharding_test_fixture_common.h
index 317e7f03b50..0f7d3730d6d 100644
--- a/src/mongo/s/sharding_test_fixture_common.h
+++ b/src/mongo/s/sharding_test_fixture_common.h
@@ -83,6 +83,10 @@ protected:
return _distLockManager;
}
+ std::shared_ptr<ThreadPool> catalogCacheExecutor() const {
+ invariant(_catalogCacheExecutor);
+ return _catalogCacheExecutor;
+ }
/**
* Blocking methods, which receive one message from the network and respond using the responses
* returned from the input function. This is a syntactic sugar for simple, single request +
@@ -148,6 +152,8 @@ protected:
// store a raw pointer to it here.
DistLockManager* _distLockManager = nullptr;
+ std::shared_ptr<ThreadPool> _catalogCacheExecutor = nullptr;
+
private:
// Keeps the lifetime of the operation context
ServiceContext::UniqueOperationContext _opCtxHolder;