/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/client/shard_registry.h" #include "mongo/client/connection_string.h" #include "mongo/client/query_fetcher.h" #include "mongo/client/remote_command_runner_impl.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; using executor::TaskExecutor; using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs; namespace { const Seconds kConfigCommandTimeout{30}; } // unnamed namespace ShardRegistry::ShardRegistry(std::unique_ptr targeterFactory, std::unique_ptr commandRunner, std::unique_ptr executor, CatalogManager* catalogManager) : _targeterFactory(std::move(targeterFactory)), _commandRunner(std::move(commandRunner)), _executor(std::move(executor)), _catalogManager(catalogManager) { // add config shard registry entry so know it's always there std::lock_guard lk(_mutex); _addConfigShard_inlock(); } ShardRegistry::~ShardRegistry() = default; void ShardRegistry::reload() { vector shards; Status status = _catalogManager->getAllShards(&shards); massert(13632, "couldn't get updated shard list from config server", status.isOK()); int numShards = shards.size(); LOG(1) << "found " << numShards << " shards listed on config server(s)"; std::lock_guard lk(_mutex); _lookup.clear(); _rsLookup.clear(); _addConfigShard_inlock(); for (const ShardType& shardType : shards) { uassertStatusOK(shardType.validate()); // Skip the config host even if there is one left over from legacy installations. The // config host is installed manually from the catalog manager data. if (shardType.getName() == "config") { continue; } _addShard_inlock(shardType); } } shared_ptr ShardRegistry::getShard(const ShardId& shardId) { shared_ptr shard = _findUsingLookUp(shardId); if (shard) { return shard; } // If we can't find the shard, we might just need to reload the cache reload(); return _findUsingLookUp(shardId); } shared_ptr ShardRegistry::lookupRSName(const string& name) const { std::lock_guard lk(_mutex); ShardMap::const_iterator i = _rsLookup.find(name); return (i == _rsLookup.end()) ? nullptr : i->second; } void ShardRegistry::remove(const ShardId& id) { std::lock_guard lk(_mutex); for (ShardMap::iterator i = _lookup.begin(); i != _lookup.end();) { shared_ptr s = i->second; if (s->getId() == id) { _lookup.erase(i++); } else { ++i; } } for (ShardMap::iterator i = _rsLookup.begin(); i != _rsLookup.end();) { shared_ptr s = i->second; if (s->getId() == id) { _rsLookup.erase(i++); } else { ++i; } } } void ShardRegistry::getAllShardIds(vector* all) const { std::set seen; { std::lock_guard lk(_mutex); for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { const shared_ptr& s = i->second; if (s->getId() == "config") { continue; } if (seen.count(s->getId())) { continue; } seen.insert(s->getId()); } } all->assign(seen.begin(), seen.end()); } void ShardRegistry::toBSON(BSONObjBuilder* result) { BSONObjBuilder b(_lookup.size() + 50); std::lock_guard lk(_mutex); for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { b.append(i->first, i->second->getConnString().toString()); } result->append("map", b.obj()); } void ShardRegistry::_addConfigShard_inlock() { ShardType configServerShard; configServerShard.setName("config"); configServerShard.setHost(_catalogManager->connectionString().toString()); _addShard_inlock(configServerShard); } void ShardRegistry::_addShard_inlock(const ShardType& shardType) { // This validation should ideally go inside the ShardType::validate call. However, doing // it there would prevent us from loading previously faulty shard hosts, which might have // been stored (i.e., the entire getAllShards call would fail). auto shardHostStatus = ConnectionString::parse(shardType.getHost()); if (!shardHostStatus.isOK()) { warning() << "Unable to parse shard host " << shardHostStatus.getStatus().toString(); } const ConnectionString& shardHost(shardHostStatus.getValue()); // Sync cluster connections (legacy config server) do not go through the normal targeting // mechanism and must only be reachable through CatalogManagerLegacy or legacy-style // queries and inserts. Do not create targeter for these connections. This code should go // away after 3.2 is released. if (shardHost.type() == ConnectionString::SYNC) { _lookup[shardType.getName()] = std::make_shared(shardType.getName(), shardHost, nullptr); return; } // Non-SYNC shards shared_ptr shard = std::make_shared( shardType.getName(), shardHost, std::move(_targeterFactory->create(shardHost))); _lookup[shardType.getName()] = shard; // TODO: The only reason to have the shard host names in the lookup table is for the // setShardVersion call, which resolves the shard id from the shard address. This is // error-prone and will go away eventually when we switch all communications to go through // the remote command runner. _lookup[shardType.getHost()] = shard; for (const HostAndPort& hostAndPort : shardHost.getServers()) { _lookup[hostAndPort.toString()] = shard; // Maintain a mapping from host to shard it belongs to for the case where we need to // update the shard connection string on reconfigurations. if (shardHost.type() == ConnectionString::SET) { _rsLookup[hostAndPort.toString()] = shard; } } if (shardHost.type() == ConnectionString::SET) { _rsLookup[shardHost.getSetName()] = shard; } } shared_ptr ShardRegistry::_findUsingLookUp(const ShardId& shardId) { std::lock_guard lk(_mutex); ShardMap::iterator it = _lookup.find(shardId); if (it != _lookup.end()) { return it->second; } return nullptr; } StatusWith> ShardRegistry::exhaustiveFind(const HostAndPort& host, const NamespaceString& nss, const BSONObj& query, boost::optional limit) { // If for some reason the callback never gets invoked, we will return this status Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); vector results; auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus, Fetcher::NextAction* nextAction) { // Throw out any accumulated results on error if (!dataStatus.isOK()) { status = dataStatus.getStatus(); results.clear(); return; } auto& data = dataStatus.getValue(); for (const BSONObj& doc : data.documents) { results.push_back(std::move(doc.getOwned())); } status = Status::OK(); }; unique_ptr findCmd( fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit)))); QueryFetcher fetcher(_executor.get(), host, nss, findCmd->asFindCommand(), fetcherCallback); Status scheduleStatus = fetcher.schedule(); if (!scheduleStatus.isOK()) { return scheduleStatus; } fetcher.wait(); if (!status.isOK()) { return status; } return results; } StatusWith ShardRegistry::runCommand(const HostAndPort& host, const std::string& dbName, const BSONObj& cmdObj) { StatusWith responseStatus = Status(ErrorCodes::InternalError, "Internal error running command"); RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout); auto callStatus = _executor->scheduleRemoteCommand(request, [&responseStatus](const RemoteCommandCallbackArgs& args) { responseStatus = args.response; }); if (!callStatus.isOK()) { return callStatus.getStatus(); } // Block until the command is carried out _executor->wait(callStatus.getValue()); if (!responseStatus.isOK()) { return responseStatus.getStatus(); } return responseStatus.getValue().data; } } // namespace mongo