/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/sharding_initialization.h" #include #include "mongo/base/status.h" #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/audit.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/executor/connection_pool.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_catalog_impl.h" #include "mongo/s/catalog/replset_dist_lock_manager.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog/sharding_catalog_manager_impl.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/sharding_egress_metadata_hook.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using executor::ConnectionPool; MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolHostTimeoutMS, int, ConnectionPool::kDefaultHostTimeout.count()); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxSize, int, -1); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMinSize, int, static_cast(ConnectionPool::kDefaultMinConns)); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolRefreshRequirementMS, int, ConnectionPool::kDefaultRefreshRequirement.count()); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolRefreshTimeoutMS, int, ConnectionPool::kDefaultRefreshTimeout.count()); namespace { using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; static constexpr auto kRetryInterval = Seconds{2}; std::unique_ptr makeTaskExecutor(std::unique_ptr net) { auto netPtr = net.get(); return stdx::make_unique( stdx::make_unique(netPtr), std::move(net)); } std::unique_ptr makeCatalogClient(ServiceContext* service, ShardRegistry* shardRegistry, StringData distLockProcessId) { auto distLockCatalog = stdx::make_unique(shardRegistry); auto distLockManager = stdx::make_unique(service, distLockProcessId, std::move(distLockCatalog), ReplSetDistLockManager::kDistLockPingInterval, ReplSetDistLockManager::kDistLockExpirationTime); return stdx::make_unique(std::move(distLockManager)); } std::unique_ptr makeTaskExecutorPool( std::unique_ptr fixedNet, rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder, ConnectionPool::Options connPoolOptions) { std::vector> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { auto net = executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique(), metadataHookBuilder(), connPoolOptions); auto netPtr = net.get(); auto exec = stdx::make_unique( stdx::make_unique(netPtr), std::move(net)); executors.emplace_back(std::move(exec)); } // Add executor used to perform non-performance critical work. auto fixedNetPtr = fixedNet.get(); auto fixedExec = stdx::make_unique( stdx::make_unique(fixedNetPtr), std::move(fixedNet)); auto executorPool = stdx::make_unique(); executorPool->addExecutors(std::move(executors), std::move(fixedExec)); return executorPool; } } // namespace const StringData kDistLockProcessIdForConfigServer("ConfigServer"); std::string generateDistLockProcessId(OperationContext* txn) { std::unique_ptr rng(SecureRandom::create()); return str::stream() << HostAndPort(getHostName(), serverGlobalParams.port).toString() << ':' << durationCount( txn->getServiceContext()->getPreciseClockSource()->now().toDurationSinceEpoch()) << ':' << rng->nextInt64(); } Status initializeGlobalShardingState(OperationContext* txn, const ConnectionString& configCS, StringData distLockProcessId, std::unique_ptr shardFactory, rpc::ShardingEgressMetadataHookBuilder hookBuilder, ShardingCatalogManagerBuilder catalogManagerBuilder) { if (configCS.type() == ConnectionString::INVALID) { return {ErrorCodes::BadValue, "Unrecognized connection string."}; } // We don't set the ConnectionPool's static const variables to be the default value in // MONGO_EXPORT_STARTUP_SERVER_PARAMETER because it's not guaranteed to be initialized. // The following code is a workaround. ConnectionPool::Options connPoolOptions; connPoolOptions.hostTimeout = Milliseconds(ShardingTaskExecutorPoolHostTimeoutMS); connPoolOptions.maxConnections = (ShardingTaskExecutorPoolMaxSize != -1) ? ShardingTaskExecutorPoolMaxSize : ConnectionPool::kDefaultMaxConns; connPoolOptions.minConnections = ShardingTaskExecutorPoolMinSize; connPoolOptions.refreshRequirement = Milliseconds(ShardingTaskExecutorPoolRefreshRequirementMS); connPoolOptions.refreshTimeout = Milliseconds(ShardingTaskExecutorPoolRefreshTimeoutMS); auto network = executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry", stdx::make_unique(), hookBuilder(), connPoolOptions); auto networkPtr = network.get(); auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder, connPoolOptions); executorPool->startup(); auto shardRegistry(stdx::make_unique(std::move(shardFactory), configCS)); auto catalogClient = makeCatalogClient(txn->getServiceContext(), shardRegistry.get(), distLockProcessId); auto rawCatalogClient = catalogClient.get(); std::unique_ptr catalogManager = catalogManagerBuilder( rawCatalogClient, makeTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor"))); auto rawCatalogManager = catalogManager.get(); grid.init( std::move(catalogClient), std::move(catalogManager), stdx::make_unique(), std::move(shardRegistry), stdx::make_unique(getGlobalServiceContext()->getPreciseClockSource()), stdx::make_unique(), std::move(executorPool), networkPtr); // must be started once the grid is initialized grid.shardRegistry()->startup(); auto status = rawCatalogClient->startup(); if (!status.isOK()) { return status; } if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { // Only config servers get a ShardingCatalogManager. status = rawCatalogManager->startup(); if (!status.isOK()) { return status; } } return Status::OK(); } Status reloadShardRegistryUntilSuccess(OperationContext* txn) { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { return Status::OK(); } while (!globalInShutdownDeprecated()) { auto stopStatus = txn->checkForInterruptNoAssert(); if (!stopStatus.isOK()) { return stopStatus; } try { uassertStatusOK(ClusterIdentityLoader::get(txn)->loadClusterId( txn, repl::ReadConcernLevel::kMajorityReadConcern)); if (grid.shardRegistry()->isUp()) { return Status::OK(); } sleepFor(kRetryInterval); continue; } catch (const DBException& ex) { Status status = ex.toStatus(); warning() << "Error initializing sharding state, sleeping for 2 seconds and trying again" << causedBy(status); sleepFor(kRetryInterval); continue; } } return {ErrorCodes::ShutdownInProgress, "aborting shard loading attempt"}; } } // namespace mongo