summaryrefslogtreecommitdiff
path: root/src/mongo/s/sharding_initialization.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/sharding_initialization.cpp')
-rw-r--r--src/mongo/s/sharding_initialization.cpp60
1 files changed, 19 insertions, 41 deletions
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 79287e37a5c..49931ed353a 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -59,7 +59,6 @@
#include "mongo/s/catalog/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
-#include "mongo/s/catalog/sharding_catalog_manager_impl.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
@@ -100,23 +99,14 @@ using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutorPool;
using executor::ThreadPoolTaskExecutor;
-using executor::ShardingTaskExecutor;
static constexpr auto kRetryInterval = Seconds{2};
const std::string kKeyManagerPurposeString = "HMAC";
const Seconds kKeyValidInterval(3 * 30 * 24 * 60 * 60); // ~3 months
-auto makeTaskExecutor(std::unique_ptr<NetworkInterface> net) {
- auto netPtr = net.get();
- auto executor = stdx::make_unique<ThreadPoolTaskExecutor>(
- stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
- return stdx::make_unique<ShardingTaskExecutor>(std::move(executor));
-}
-
std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service,
- ShardRegistry* shardRegistry,
StringData distLockProcessId) {
- auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
+ auto distLockCatalog = stdx::make_unique<DistLockCatalogImpl>();
auto distLockManager =
stdx::make_unique<ReplSetDistLockManager>(service,
distLockProcessId,
@@ -127,14 +117,14 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager));
}
-std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
+std::unique_ptr<TaskExecutorPool> makeShardingTaskExecutorPool(
std::unique_ptr<NetworkInterface> fixedNet,
rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder,
ConnectionPool::Options connPoolOptions) {
std::vector<std::unique_ptr<executor::TaskExecutor>> executors;
for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) {
- auto exec = makeTaskExecutor(executor::makeNetworkInterface(
+ auto exec = makeShardingTaskExecutor(executor::makeNetworkInterface(
"NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i),
stdx::make_unique<ShardingNetworkConnectionHook>(),
metadataHookBuilder(),
@@ -144,7 +134,7 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
}
// Add executor used to perform non-performance critical work.
- auto fixedExec = makeTaskExecutor(std::move(fixedNet));
+ auto fixedExec = makeShardingTaskExecutor(std::move(fixedNet));
auto executorPool = stdx::make_unique<TaskExecutorPool>();
executorPool->addExecutors(std::move(executors), std::move(fixedExec));
@@ -153,7 +143,14 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
} // namespace
-const StringData kDistLockProcessIdForConfigServer("ConfigServer");
+std::unique_ptr<executor::TaskExecutor> makeShardingTaskExecutor(
+ std::unique_ptr<NetworkInterface> net) {
+ auto netPtr = net.get();
+ auto executor = stdx::make_unique<ThreadPoolTaskExecutor>(
+ stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
+
+ return stdx::make_unique<executor::ShardingTaskExecutor>(std::move(executor));
+}
std::string generateDistLockProcessId(OperationContext* opCtx) {
std::unique_ptr<SecureRandom> rng(SecureRandom::create());
@@ -170,8 +167,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
StringData distLockProcessId,
std::unique_ptr<ShardFactory> shardFactory,
std::unique_ptr<CatalogCache> catalogCache,
- rpc::ShardingEgressMetadataHookBuilder hookBuilder,
- ShardingCatalogManagerBuilder catalogManagerBuilder) {
+ rpc::ShardingEgressMetadataHookBuilder hookBuilder) {
if (configCS.type() == ConnectionString::INVALID) {
return {ErrorCodes::BadValue, "Unrecognized connection string."};
}
@@ -197,27 +193,15 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
hookBuilder(),
connPoolOptions);
auto networkPtr = network.get();
- auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder, connPoolOptions);
+ auto executorPool =
+ makeShardingTaskExecutorPool(std::move(network), hookBuilder, connPoolOptions);
executorPool->startup();
- auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS));
-
- auto catalogClient =
- makeCatalogClient(opCtx->getServiceContext(), shardRegistry.get(), distLockProcessId);
-
- auto rawCatalogClient = catalogClient.get();
-
- std::unique_ptr<ShardingCatalogManager> catalogManager = catalogManagerBuilder(
- rawCatalogClient,
- makeTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor")));
- auto rawCatalogManager = catalogManager.get();
-
auto const grid = Grid::get(opCtx);
grid->init(
- std::move(catalogClient),
- std::move(catalogManager),
+ makeCatalogClient(opCtx->getServiceContext(), distLockProcessId),
std::move(catalogCache),
- std::move(shardRegistry),
+ stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS),
stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()),
stdx::make_unique<BalancerConfiguration>(),
std::move(executorPool),
@@ -226,16 +210,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
// The shard registry must be started once the grid is initialized
grid->shardRegistry()->startup(opCtx);
+ // The catalog client must be started after the shard registry has been started up
grid->catalogClient()->startup();
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- // Only config servers get a ShardingCatalogManager.
- Status status = rawCatalogManager->startup();
- if (!status.isOK()) {
- return status;
- }
- }
-
auto keyManager = std::make_shared<KeysCollectionManagerSharding>(
kKeyManagerPurposeString, grid->catalogClient(), Seconds(KeysRotationIntervalSec));
keyManager->startMonitoring(opCtx->getServiceContext());
@@ -249,6 +226,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
replCoord->getMemberState().primary()) {
LogicalTimeValidator::get(opCtx)->enableKeyGenerator(opCtx, true);
}
+
return Status::OK();
}