/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/sharding_initialization.h" #include #include "mongo/base/status.h" #include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/client/syncclusterconnection.h" #include "mongo/db/audit.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/metadata_hook.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/s/catalog/forwarding_catalog_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/sock.h" namespace mongo { namespace { using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutorPool; using executor::ThreadPoolTaskExecutor; // Same logic as sharding_connection_hook.cpp. class ShardingEgressMetadataHook final : public rpc::EgressMetadataHook { public: Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override { try { audit::writeImpersonatedUsersToMetadata(metadataBob); // Add config server optime to metadata sent to shards. auto shard = grid.shardRegistry()->getShardForHostNoReload(target); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Shard not found for server: " << target.toString()); } if (shard->isConfig()) { return Status::OK(); } rpc::ConfigServerMetadata(grid.shardRegistry()->getConfigOpTime()) .writeToMetadata(metadataBob); return Status::OK(); } catch (...) { return exceptionToStatus(); } } Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override { try { saveGLEStats(metadataObj, replySource.toString()); auto shard = grid.shardRegistry()->getShardForHostNoReload(replySource); if (!shard) { return Status::OK(); } // If this host is a known shard of ours, look for a config server optime in the // response metadata to use to update our notion of the current config server optime. auto responseStatus = rpc::ConfigServerMetadata::readFromMetadata(metadataObj); if (!responseStatus.isOK()) { return responseStatus.getStatus(); } auto opTime = responseStatus.getValue().getOpTime(); if (opTime.is_initialized()) { grid.shardRegistry()->advanceConfigOpTime(opTime.get()); } return Status::OK(); } catch (...) { return exceptionToStatus(); } } }; std::unique_ptr makeTaskExecutor(std::unique_ptr net) { auto netPtr = net.get(); return stdx::make_unique( stdx::make_unique(netPtr), std::move(net)); } std::unique_ptr makeTaskExecutorPool(std::unique_ptr fixedNet) { std::vector> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { auto net = executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique(), stdx::make_unique()); auto netPtr = net.get(); auto exec = stdx::make_unique( stdx::make_unique(netPtr), std::move(net)); executors.emplace_back(std::move(exec)); } // Add executor used to perform non-performance critical work. auto fixedNetPtr = fixedNet.get(); auto fixedExec = stdx::make_unique( stdx::make_unique(fixedNetPtr), std::move(fixedNet)); auto executorPool = stdx::make_unique(); executorPool->addExecutors(std::move(executors), std::move(fixedExec)); return executorPool; } } // namespace Status initializeGlobalShardingState(OperationContext* txn, const ConnectionString& configCS, bool allowNetworking) { SyncClusterConnection::setConnectionValidationHook( [](const HostAndPort& target, const executor::RemoteCommandResponse& isMasterReply) { return ShardingNetworkConnectionHook::validateHostImpl(target, isMasterReply); }); auto network = executor::makeNetworkInterface("NetworkInterfaceASIO-ShardRegistry", stdx::make_unique(), stdx::make_unique()); auto networkPtr = network.get(); auto shardRegistry( stdx::make_unique(stdx::make_unique(), makeTaskExecutorPool(std::move(network)), networkPtr, makeTaskExecutor(executor::makeNetworkInterface( "NetworkInterfaceASIO-ShardRegistry-TaskExecutor")), configCS)); std::unique_ptr catalogManager = stdx::make_unique( getGlobalServiceContext(), configCS, shardRegistry.get(), HostAndPort(getHostName(), serverGlobalParams.port)); shardRegistry->startup(); grid.init(std::move(catalogManager), std::move(shardRegistry), stdx::make_unique(getGlobalServiceContext()->getClockSource())); while (!inShutdown()) { try { Status status = grid.catalogManager(txn)->startup(txn, allowNetworking); uassertStatusOK(status); if (serverGlobalParams.configsvrMode == CatalogManager::ConfigServerMode::NONE) { grid.shardRegistry()->reload(txn); } return Status::OK(); } catch (const DBException& ex) { Status status = ex.toStatus(); if (status == ErrorCodes::ConfigServersInconsistent) { // Legacy catalog manager can return ConfigServersInconsistent. When that happens // we should immediately fail initialization. For all other failures we should // retry. return status; } if (status == ErrorCodes::ReplicaSetNotFound) { // ReplicaSetNotFound most likely means we've been waiting for the config replica // set to come up for so long that the ReplicaSetMonitor stopped monitoring the set. // Rebuild the config shard to force the monitor to resume monitoring the config // servers. grid.shardRegistry()->rebuildConfigShard(); } log() << "Error initializing sharding state, sleeping for 2 seconds and trying again" << causedBy(status); sleepmillis(2000); continue; } } return Status::OK(); } } // namespace mongo