diff options
Diffstat (limited to 'src/mongo/s/sharding_initialization.cpp')
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 60 |
1 files changed, 19 insertions, 41 deletions
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 79287e37a5c..49931ed353a 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -59,7 +59,6 @@ #include "mongo/s/catalog/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset_dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" -#include "mongo/s/catalog/sharding_catalog_manager_impl.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" @@ -100,23 +99,14 @@ using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; -using executor::ShardingTaskExecutor; static constexpr auto kRetryInterval = Seconds{2}; const std::string kKeyManagerPurposeString = "HMAC"; const Seconds kKeyValidInterval(3 * 30 * 24 * 60 * 60); // ~3 months -auto makeTaskExecutor(std::unique_ptr<NetworkInterface> net) { - auto netPtr = net.get(); - auto executor = stdx::make_unique<ThreadPoolTaskExecutor>( - stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); - return stdx::make_unique<ShardingTaskExecutor>(std::move(executor)); -} - std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service, - ShardRegistry* shardRegistry, StringData distLockProcessId) { - auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry); + auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(); auto distLockManager = stdx::make_unique<ReplSetDistLockManager>(service, distLockProcessId, @@ -127,14 +117,14 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } -std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( +std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool( std::unique_ptr<NetworkInterface> fixedNet, rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder, ConnectionPool::Options connPoolOptions) { std::vector<std::unique_ptr<executor::TaskExecutor>> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { - auto exec = makeTaskExecutor(executor::makeNetworkInterface( + auto exec = makeShardingTaskExecutor(executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique<ShardingNetworkConnectionHook>(), metadataHookBuilder(), @@ -144,7 +134,7 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( } // Add executor used to perform non-performance critical work. - auto fixedExec = makeTaskExecutor(std::move(fixedNet)); + auto fixedExec = makeShardingTaskExecutor(std::move(fixedNet)); auto executorPool = stdx::make_unique<TaskExecutorPool>(); executorPool->addExecutors(std::move(executors), std::move(fixedExec)); @@ -153,7 +143,14 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( } // namespace -const StringData kDistLockProcessIdForConfigServer("ConfigServer"); +std::unique_ptr<executor::TaskExecutor> makeShardingTaskExecutor( + std::unique_ptr<NetworkInterface> net) { + auto netPtr = net.get(); + auto executor = stdx::make_unique<ThreadPoolTaskExecutor>( + stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + + return stdx::make_unique<executor::ShardingTaskExecutor>(std::move(executor)); +} std::string generateDistLockProcessId(OperationContext* opCtx) { std::unique_ptr<SecureRandom> rng(SecureRandom::create()); @@ -170,8 +167,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, StringData distLockProcessId, std::unique_ptr<ShardFactory> shardFactory, std::unique_ptr<CatalogCache> catalogCache, - rpc::ShardingEgressMetadataHookBuilder hookBuilder, - ShardingCatalogManagerBuilder catalogManagerBuilder) { + rpc::ShardingEgressMetadataHookBuilder hookBuilder) { if (configCS.type() == ConnectionString::INVALID) { return {ErrorCodes::BadValue, "Unrecognized connection string."}; } @@ -197,27 +193,15 @@ Status initializeGlobalShardingState(OperationContext* opCtx, hookBuilder(), connPoolOptions); auto networkPtr = network.get(); - auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder, connPoolOptions); + auto executorPool = + makeShardingTaskExecutorPool(std::move(network), hookBuilder, connPoolOptions); executorPool->startup(); - auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); - - auto catalogClient = - makeCatalogClient(opCtx->getServiceContext(), shardRegistry.get(), distLockProcessId); - - auto rawCatalogClient = catalogClient.get(); - - std::unique_ptr<ShardingCatalogManager> catalogManager = catalogManagerBuilder( - rawCatalogClient, - makeTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor"))); - auto rawCatalogManager = catalogManager.get(); - auto const grid = Grid::get(opCtx); grid->init( - std::move(catalogClient), - std::move(catalogManager), + makeCatalogClient(opCtx->getServiceContext(), distLockProcessId), std::move(catalogCache), - std::move(shardRegistry), + stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS), stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()), stdx::make_unique<BalancerConfiguration>(), std::move(executorPool), @@ -226,16 +210,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx, // The shard registry must be started once the grid is initialized grid->shardRegistry()->startup(opCtx); + // The catalog client must be started after the shard registry has been started up grid->catalogClient()->startup(); - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - // Only config servers get a ShardingCatalogManager. - Status status = rawCatalogManager->startup(); - if (!status.isOK()) { - return status; - } - } - auto keyManager = std::make_shared<KeysCollectionManagerSharding>( kKeyManagerPurposeString, grid->catalogClient(), Seconds(KeysRotationIntervalSec)); keyManager->startMonitoring(opCtx->getServiceContext()); @@ -249,6 +226,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, replCoord->getMemberState().primary()) { LogicalTimeValidator::get(opCtx)->enableKeyGenerator(opCtx, true); } + return Status::OK(); } |