/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/sharding_initialization.h"
#include
#include "mongo/base/status.h"
#include "mongo/client/remote_command_targeter_factory_impl.h"
#include "mongo/db/audit.h"
#include "mongo/db/keys_collection_client_sharded.h"
#include "mongo/db/keys_collection_manager.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/time_proof_service.h"
#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/rpc/metadata/config_server_metadata.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/dist_lock_catalog_impl.h"
#include "mongo/s/catalog/replset_dist_lock_manager.h"
#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/sharding_network_connection_hook.h"
#include "mongo/s/cluster_identity_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_task_executor.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/sock.h"
namespace mongo {
using executor::ConnectionPool;
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolHostTimeoutMS,
int,
ConnectionPool::kDefaultHostTimeout.count());
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxSize, int, -1);
// By default, limit us to two concurrent pending connection attempts
// in any one pool. Since pools are currently per-cpu, we still may
// have something like 64 concurrent total connection attempts on a
// modestly sized system. We could set it to one, but that seems too
// restrictive.
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxConnecting, int, 2);
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMinSize,
int,
static_cast(ConnectionPool::kDefaultMinConns));
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolRefreshRequirementMS,
int,
ConnectionPool::kDefaultRefreshRequirement.count());
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolRefreshTimeoutMS,
int,
ConnectionPool::kDefaultRefreshTimeout.count());
namespace {
using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutorPool;
using executor::ThreadPoolTaskExecutor;
static constexpr auto kRetryInterval = Seconds{2};
std::unique_ptr makeCatalogClient(ServiceContext* service,
StringData distLockProcessId) {
auto distLockCatalog = stdx::make_unique();
auto distLockManager =
stdx::make_unique(service,
distLockProcessId,
std::move(distLockCatalog),
ReplSetDistLockManager::kDistLockPingInterval,
ReplSetDistLockManager::kDistLockExpirationTime);
return stdx::make_unique(std::move(distLockManager));
}
std::unique_ptr makeShardingTaskExecutorPool(
std::unique_ptr fixedNet,
rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder,
ConnectionPool::Options connPoolOptions,
boost::optional taskExecutorPoolSize) {
std::vector> executors;
const auto poolSize = taskExecutorPoolSize.value_or(TaskExecutorPool::getSuggestedPoolSize());
for (size_t i = 0; i < poolSize; ++i) {
auto exec = makeShardingTaskExecutor(executor::makeNetworkInterface(
"NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i),
stdx::make_unique(),
metadataHookBuilder(),
connPoolOptions));
executors.emplace_back(std::move(exec));
}
// Add executor used to perform non-performance critical work.
auto fixedExec = makeShardingTaskExecutor(std::move(fixedNet));
auto executorPool = stdx::make_unique();
executorPool->addExecutors(std::move(executors), std::move(fixedExec));
return executorPool;
}
} // namespace
std::unique_ptr makeShardingTaskExecutor(
std::unique_ptr net) {
auto netPtr = net.get();
auto executor = stdx::make_unique(
stdx::make_unique(netPtr), std::move(net));
return stdx::make_unique(std::move(executor));
}
std::string generateDistLockProcessId(OperationContext* opCtx) {
std::unique_ptr rng(SecureRandom::create());
return str::stream()
<< HostAndPort(getHostName(), serverGlobalParams.port).toString() << ':'
<< durationCount(
opCtx->getServiceContext()->getPreciseClockSource()->now().toDurationSinceEpoch())
<< ':' << rng->nextInt64();
}
Status initializeGlobalShardingState(OperationContext* opCtx,
const ConnectionString& configCS,
StringData distLockProcessId,
std::unique_ptr shardFactory,
std::unique_ptr catalogCache,
rpc::ShardingEgressMetadataHookBuilder hookBuilder,
boost::optional taskExecutorPoolSize) {
if (configCS.type() == ConnectionString::INVALID) {
return {ErrorCodes::BadValue, "Unrecognized connection string."};
}
// We don't set the ConnectionPool's static const variables to be the default value in
// MONGO_EXPORT_STARTUP_SERVER_PARAMETER because it's not guaranteed to be initialized.
// The following code is a workaround.
ConnectionPool::Options connPoolOptions;
connPoolOptions.hostTimeout = Milliseconds(ShardingTaskExecutorPoolHostTimeoutMS);
connPoolOptions.maxConnections = (ShardingTaskExecutorPoolMaxSize != -1)
? ShardingTaskExecutorPoolMaxSize
: ConnectionPool::kDefaultMaxConns;
connPoolOptions.maxConnecting = (ShardingTaskExecutorPoolMaxConnecting != -1)
? ShardingTaskExecutorPoolMaxConnecting
: ConnectionPool::kDefaultMaxConnecting;
connPoolOptions.minConnections = ShardingTaskExecutorPoolMinSize;
connPoolOptions.refreshRequirement = Milliseconds(ShardingTaskExecutorPoolRefreshRequirementMS);
connPoolOptions.refreshTimeout = Milliseconds(ShardingTaskExecutorPoolRefreshTimeoutMS);
if (connPoolOptions.refreshRequirement <= connPoolOptions.refreshTimeout) {
auto newRefreshTimeout = connPoolOptions.refreshRequirement - Milliseconds(1);
warning() << "ShardingTaskExecutorPoolRefreshRequirementMS ("
<< connPoolOptions.refreshRequirement
<< ") set below ShardingTaskExecutorPoolRefreshTimeoutMS ("
<< connPoolOptions.refreshTimeout
<< "). Adjusting ShardingTaskExecutorPoolRefreshTimeoutMS to "
<< newRefreshTimeout;
connPoolOptions.refreshTimeout = newRefreshTimeout;
}
if (connPoolOptions.hostTimeout <=
connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout) {
auto newHostTimeout =
connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout + Milliseconds(1);
warning() << "ShardingTaskExecutorPoolHostTimeoutMS (" << connPoolOptions.hostTimeout
<< ") set below ShardingTaskExecutorPoolRefreshRequirementMS ("
<< connPoolOptions.refreshRequirement
<< ") + ShardingTaskExecutorPoolRefreshTimeoutMS ("
<< connPoolOptions.refreshTimeout
<< "). Adjusting ShardingTaskExecutorPoolHostTimeoutMS to " << newHostTimeout;
connPoolOptions.hostTimeout = newHostTimeout;
}
auto network =
executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry",
stdx::make_unique(),
hookBuilder(),
connPoolOptions);
auto networkPtr = network.get();
auto executorPool = makeShardingTaskExecutorPool(
std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize);
executorPool->startup();
auto const grid = Grid::get(opCtx);
grid->init(
makeCatalogClient(opCtx->getServiceContext(), distLockProcessId),
std::move(catalogCache),
stdx::make_unique(std::move(shardFactory), configCS),
stdx::make_unique(getGlobalServiceContext()->getPreciseClockSource()),
stdx::make_unique(),
std::move(executorPool),
networkPtr);
// The shard registry must be started once the grid is initialized
grid->shardRegistry()->startup(opCtx);
// The catalog client must be started after the shard registry has been started up
grid->catalogClient()->startup();
auto keysCollectionClient =
stdx::make_unique(grid->catalogClient());
auto keyManager =
std::make_shared(KeysCollectionManager::kKeyManagerPurposeString,
std::move(keysCollectionClient),
Seconds(KeysRotationIntervalSec));
keyManager->startMonitoring(opCtx->getServiceContext());
LogicalTimeValidator::set(opCtx->getServiceContext(),
stdx::make_unique(keyManager));
auto replCoord = repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
replCoord->getMemberState().primary()) {
LogicalTimeValidator::get(opCtx)->enableKeyGenerator(opCtx, true);
}
return Status::OK();
}
Status waitForShardRegistryReload(OperationContext* opCtx) {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
return Status::OK();
}
while (!globalInShutdownDeprecated()) {
auto stopStatus = opCtx->checkForInterruptNoAssert();
if (!stopStatus.isOK()) {
return stopStatus;
}
try {
uassertStatusOK(ClusterIdentityLoader::get(opCtx)->loadClusterId(
opCtx, repl::ReadConcernLevel::kMajorityReadConcern));
if (Grid::get(opCtx)->shardRegistry()->isUp()) {
return Status::OK();
}
sleepFor(kRetryInterval);
continue;
} catch (const DBException& ex) {
Status status = ex.toStatus();
warning()
<< "Error initializing sharding state, sleeping for 2 seconds and trying again"
<< causedBy(status);
sleepFor(kRetryInterval);
continue;
}
}
return {ErrorCodes::ShutdownInProgress, "aborting shard loading attempt"};
}
} // namespace mongo