diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-05-28 16:41:10 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-06-09 22:36:29 -0400 |
commit | 1106b8f8f203ec633d231152c2a077dce45e05f0 (patch) | |
tree | 8f3829187c32274276047ce6b24c8e7b1a57f98f /src | |
parent | 5902eb0903eb04a546c21fe8bbc1d9364e8fafa3 (diff) | |
download | mongo-1106b8f8f203ec633d231152c2a077dce45e05f0.tar.gz |
SERVER-18438/SERVER-18435 Add RemoteCommandTargeter and runner to grid
* Implement a RemoteCommandTargeterFactory to dispense targeters based on
the connection string.
* Implement replica set monitor-based remote command targeter factory.
* Hook these with the global grid object.
* Switch the Shard::runCommand call to build on those.
Diffstat (limited to 'src')
26 files changed, 434 insertions, 160 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 83dd6bae95f..3b41aaeb36b 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -126,11 +126,12 @@ env.Library( env.Library( target='remote_command_targeter', source=[ + 'remote_command_targeter_factory_impl.cpp', 'remote_command_targeter_rs.cpp', 'remote_command_targeter_standalone.cpp', ], LIBDEPS=[ - + 'clientdriver', ] ) diff --git a/src/mongo/client/read_preference.cpp b/src/mongo/client/read_preference.cpp index 1a876259126..e088504fe15 100644 --- a/src/mongo/client/read_preference.cpp +++ b/src/mongo/client/read_preference.cpp @@ -55,7 +55,7 @@ namespace { const char kSecondaryPreferred[] = "secondaryPreferred"; const char kNearest[] = "nearest"; - StringData toString(ReadPreference pref) { + StringData readPreferenceName(ReadPreference pref) { switch (pref) { case ReadPreference::PrimaryOnly: return StringData(kPrimaryOnly); @@ -177,11 +177,15 @@ namespace { BSONObj ReadPreferenceSetting::toBSON() const { BSONObjBuilder bob; - bob.append(kModeFieldName, toString(pref)); + bob.append(kModeFieldName, readPreferenceName(pref)); if (tags != defaultTagSetForMode(pref)) { bob.append(kTagsFieldName, tags.getTagBSON()); } return bob.obj(); } + std::string ReadPreferenceSetting::toString() const { + return toBSON().toString(); + } + } // namespace mongo diff --git a/src/mongo/client/read_preference.h b/src/mongo/client/read_preference.h index e10b6ac1524..5b23335bf31 100644 --- a/src/mongo/client/read_preference.h +++ b/src/mongo/client/read_preference.h @@ -127,6 +127,11 @@ namespace mongo { BSONObj toBSON() const; /** + * Describes this ReadPreferenceSetting as a string. + */ + std::string toString() const; + + /** * Parses a ReadPreferenceSetting from a BSON document of the form: * { mode: <mode>, tags: <array of tags> }. The 'mode' element must a string equal to either * "primary", "primaryPreferred", "secondary", "secondaryPreferred", or "nearest". Although diff --git a/src/mongo/client/remote_command_targeter.h b/src/mongo/client/remote_command_targeter.h index c30b3c989e4..6c559dbcf5e 100644 --- a/src/mongo/client/remote_command_targeter.h +++ b/src/mongo/client/remote_command_targeter.h @@ -36,7 +36,6 @@ namespace mongo { struct HostAndPort; template<typename T> class StatusWith; - /** * Interface encapsulating the targeting logic for a given replica set or a standalone host. */ @@ -48,8 +47,9 @@ namespace mongo { /** * Obtains a host, which matches the read preferences specified by readPref. * - * Returns OK and a host and port to use for the specified read preference. Otherwise may - * return any ErrorCode. + * Returns OK and a host and port to use for the specified read preference or any + * ErrorCode. Known error codes are: + * HostNotFound if no host matches the specified read preference critera */ virtual StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) = 0; diff --git a/src/mongo/client/remote_command_targeter_factory.h b/src/mongo/client/remote_command_targeter_factory.h new file mode 100644 index 00000000000..b250489b39e --- /dev/null +++ b/src/mongo/client/remote_command_targeter_factory.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/base/disallow_copying.h" + +namespace mongo { + + class ConnectionString; + class RemoteCommandTargeter; + + /** + * Constructs RemoteCommandTargeters based on the specific type of the target (standalone, + * replica set, etc). + */ + class RemoteCommandTargeterFactory { + MONGO_DISALLOW_COPYING(RemoteCommandTargeterFactory); + public: + virtual ~RemoteCommandTargeterFactory() = default; + + /** + * Instantiates a RemoteCommandTargeter for the specified connection string. + */ + virtual std::unique_ptr<RemoteCommandTargeter> create(const ConnectionString& connStr) = 0; + + protected: + RemoteCommandTargeterFactory() = default; + }; + +} // namespace mongo diff --git a/src/mongo/client/remote_command_targeter_factory_impl.cpp b/src/mongo/client/remote_command_targeter_factory_impl.cpp new file mode 100644 index 00000000000..191b5a4dfa7 --- /dev/null +++ b/src/mongo/client/remote_command_targeter_factory_impl.cpp @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_targeter_factory_impl.h" + +#include "mongo/base/status_with.h" +#include "mongo/client/connection_string.h" +#include "mongo/client/remote_command_targeter_rs.h" +#include "mongo/client/remote_command_targeter_standalone.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + + RemoteCommandTargeterFactoryImpl::RemoteCommandTargeterFactoryImpl() = default; + + RemoteCommandTargeterFactoryImpl::~RemoteCommandTargeterFactoryImpl() = default; + + std::unique_ptr<RemoteCommandTargeter> + RemoteCommandTargeterFactoryImpl::create(const ConnectionString& connStr) { + switch (connStr.type()) { + case ConnectionString::MASTER: + case ConnectionString::CUSTOM: + invariant(connStr.getServers().size() == 1); + return stdx::make_unique<RemoteCommandTargeterStandalone>(connStr.getServers().front()); + case ConnectionString::SET: + return stdx::make_unique<RemoteCommandTargeterRS>(connStr.getSetName(), + connStr.getServers()); + case ConnectionString::INVALID: + case ConnectionString::SYNC: + // These connections should never be seen + break; + } + + MONGO_UNREACHABLE; + } + +} // namespace mongo diff --git a/src/mongo/client/remote_command_targeter_factory_impl.h b/src/mongo/client/remote_command_targeter_factory_impl.h new file mode 100644 index 00000000000..f0118a01549 --- /dev/null +++ b/src/mongo/client/remote_command_targeter_factory_impl.h @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/client/remote_command_targeter_factory.h" + +namespace mongo { + + /** + * Targeter factory that instantiates remote command targeters based on the type of the + * connection. It will return RemoteCommandTargeterStandalone for a single node (MASTER) or + * custom (CUSTOM) connection string and RemoteCommandTargeterRS for a SET connection string. + * All other connection strings are not supported and will cause a failed invariant error. + */ + class RemoteCommandTargeterFactoryImpl final : public RemoteCommandTargeterFactory { + public: + RemoteCommandTargeterFactoryImpl(); + ~RemoteCommandTargeterFactoryImpl(); + + std::unique_ptr<RemoteCommandTargeter> create(const ConnectionString& connStr) override; + }; + +} // namespace mongo diff --git a/src/mongo/client/remote_command_targeter_mock.h b/src/mongo/client/remote_command_targeter_mock.h index 5a9bb7532e7..7e398a9e7bf 100644 --- a/src/mongo/client/remote_command_targeter_mock.h +++ b/src/mongo/client/remote_command_targeter_mock.h @@ -33,7 +33,7 @@ namespace mongo { - class RemoteCommandTargeterMock : public RemoteCommandTargeter { + class RemoteCommandTargeterMock final : public RemoteCommandTargeter { public: RemoteCommandTargeterMock(); virtual ~RemoteCommandTargeterMock() = default; @@ -42,7 +42,7 @@ namespace mongo { * Returns the return value last set by setFindHostReturnValue. * Returns ErrorCodes::InternalError if setFindHostReturnValue was never called. */ - virtual StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) override; + StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) override; /** * Sets the return value for the next call to findHost. diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp index 0d3329a212f..9012ec21dbd 100644 --- a/src/mongo/client/remote_command_targeter_rs.cpp +++ b/src/mongo/client/remote_command_targeter_rs.cpp @@ -31,22 +31,47 @@ #include "mongo/client/remote_command_targeter_rs.h" #include "mongo/base/status_with.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/replica_set_monitor.h" #include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" namespace mongo { RemoteCommandTargeterRS::RemoteCommandTargeterRS(const std::string& rsName, const std::vector<HostAndPort>& seedHosts) - : _rsName(rsName), - _seedHosts(seedHosts) { + : _rsName(rsName) { + _rsMonitor = ReplicaSetMonitor::get(rsName); + if (!_rsMonitor) { + std::set<HostAndPort> seedServers(seedHosts.begin(), seedHosts.end()); + + // TODO: Replica set monitor should be entirely owned and maintained by the remote + // command targeter. Otherwise, there is a slight race condition here where the + // RS monitor might be created, but then before the get call it gets removed so + // we end up with a NULL _rsMonitor. + ReplicaSetMonitor::createIfNeeded(rsName, seedServers); + _rsMonitor = ReplicaSetMonitor::get(rsName); + } } StatusWith<HostAndPort> RemoteCommandTargeterRS::findHost( const ReadPreferenceSetting& readPref) { - invariant(false); - return Status(ErrorCodes::IllegalOperation, "Not yet implemented"); + + if (!_rsMonitor) { + return Status(ErrorCodes::ReplicaSetNotFound, + str::stream() << "unknown replica set " << _rsName); + } + + HostAndPort hostAndPort = _rsMonitor->getHostOrRefresh(readPref); + if (hostAndPort.empty()) { + return Status(ErrorCodes::HostNotFound, + str::stream() << "could not find host matching read preference " + << readPref.toString() << " for set " << _rsName); + } + + return hostAndPort; } } // namespace mongo diff --git a/src/mongo/client/remote_command_targeter_rs.h b/src/mongo/client/remote_command_targeter_rs.h index eb7a8b6e442..253b0da7d2e 100644 --- a/src/mongo/client/remote_command_targeter_rs.h +++ b/src/mongo/client/remote_command_targeter_rs.h @@ -28,6 +28,7 @@ #pragma once +#include <boost/shared_ptr.hpp> #include <string> #include <vector> @@ -35,11 +36,13 @@ namespace mongo { + class ReplicaSetMonitor; + /** * Implements a replica-set backed remote command targeter, which monitors the specified * replica set and responds to state changes. */ - class RemoteCommandTargeterRS : public RemoteCommandTargeter { + class RemoteCommandTargeterRS final : public RemoteCommandTargeter { public: /** * Instantiates a new targeter for the specified replica set and seed hosts. The RS name @@ -48,11 +51,14 @@ namespace mongo { RemoteCommandTargeterRS(const std::string& rsName, const std::vector<HostAndPort>& seedHosts); - StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) final; + StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) override; private: + // Name of the replica set which this targeter maintains const std::string _rsName; - const std::vector<HostAndPort> _seedHosts; + + // Monitor for this replica set + boost::shared_ptr<ReplicaSetMonitor> _rsMonitor; }; } // namespace mongo diff --git a/src/mongo/client/remote_command_targeter_standalone.h b/src/mongo/client/remote_command_targeter_standalone.h index fbfc0b47e62..06e5c6acc3c 100644 --- a/src/mongo/client/remote_command_targeter_standalone.h +++ b/src/mongo/client/remote_command_targeter_standalone.h @@ -37,11 +37,11 @@ namespace mongo { * Implements a standalone instance remote command targeter, which always returns the same * host regardless of the read preferences. */ - class RemoteCommandTargeterStandalone : public RemoteCommandTargeter { + class RemoteCommandTargeterStandalone final : public RemoteCommandTargeter { public: explicit RemoteCommandTargeterStandalone(const HostAndPort& hostAndPort); - StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) final; + StatusWith<HostAndPort> findHost(const ReadPreferenceSetting& readPref) override; private: const HostAndPort _hostAndPort; diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index 80438b00281..42b90d8cfe5 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -256,13 +256,14 @@ namespace mongo { for (const ShardType& shardData : shards) { boost::shared_ptr<Shard> shard = - grid.shardRegistry()->findIfExists(shardData.getHost()); + grid.shardRegistry()->findIfExists(shardData.getName()); // The shard must still exist in the registry. If it doesn't, which may happen in // the very low proability case that it gets dropped between the call to // getAllShards above and the call to findIfExists, just don't account for it since // it is missing anyways. if (!shard) { + warning() << "Shard [" << shardData.getName() << "] was not found. Skipping."; continue; } diff --git a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp index 77325aa8829..df09e27564d 100644 --- a/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp +++ b/src/mongo/s/catalog/legacy/catalog_manager_legacy.cpp @@ -673,6 +673,7 @@ namespace { } catch (const DBException& e) { if (shardConnectionString.type() == ConnectionString::SET) { + shardConnectionPool.removeHost(shardConnectionString.getSetName()); ReplicaSetMonitor::remove(shardConnectionString.getSetName()); } @@ -773,8 +774,7 @@ namespace { BSONObj searchDoc = BSON(ShardType::name() << name); // Case 1: start draining chunks - BSONObj drainingDoc = - BSON(ShardType::name() << name << ShardType::draining(true)); + BSONObj drainingDoc = BSON(ShardType::name() << name << ShardType::draining(true)); BSONObj shardDoc = conn->findOne(ShardType::ConfigNS, drainingDoc); if (shardDoc.isEmpty()) { log() << "going to start draining shard: " << name; @@ -830,6 +830,7 @@ namespace { } Shard::removeShard(name); + shardConnectionPool.removeHost(name); ReplicaSetMonitor::remove(name); @@ -981,7 +982,7 @@ namespace { // Delete data from all mongods for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) { - const auto& shard = grid.shardRegistry()->findIfExists(i->getHost()); + const auto& shard = grid.shardRegistry()->findIfExists(i->getName()); ScopedDbConnection conn(shard->getConnString()); BSONObj info; @@ -1031,7 +1032,7 @@ namespace { LOG(1) << "dropCollection " << collectionNs << " chunk data deleted"; for (vector<ShardType>::const_iterator i = allShards.begin(); i != allShards.end(); i++) { - const auto& shard = grid.shardRegistry()->findIfExists(i->getHost()); + const auto& shard = grid.shardRegistry()->findIfExists(i->getName()); ScopedDbConnection conn(shard->getConnString()); BSONObj res; diff --git a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp index 02dd10cc18c..ce698c1b6f1 100644 --- a/src/mongo/s/catalog/legacy/cluster_client_internal.cpp +++ b/src/mongo/s/catalog/legacy/cluster_client_internal.cpp @@ -142,15 +142,15 @@ namespace mongo { << " failed validation: " << causedBy(status)); } - string errMsg; - ConnectionString shardLoc = ConnectionString::parse(shard.getHost(), errMsg); - if (shardLoc.type() == ConnectionString::INVALID) { + const auto shardConnStatus = ConnectionString::parse(shard.getHost()); + if (!shardConnStatus.isOK()) { return Status(ErrorCodes::UnsupportedFormat, stream() << "invalid shard host " << shard.getHost() - << " read from the config server" << causedBy(errMsg)); + << " read from the config server" + << shardConnStatus.getStatus().toString()); } - vector<HostAndPort> shardServers = shardLoc.getServers(); + vector<HostAndPort> shardServers = shardConnStatus.getValue().getServers(); servers.insert(servers.end(), shardServers.begin(), shardServers.end()); } } diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index 725cee5d910..8e76af2ffd3 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -12,6 +12,8 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/client/remote_command_runner_impl', + '$BUILD_DIR/mongo/client/remote_command_targeter', '$BUILD_DIR/mongo/s/catalog/catalog_manager', ] ) diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp index 172636f11bd..ec9dbfa52c1 100644 --- a/src/mongo/s/client/shard.cpp +++ b/src/mongo/s/client/shard.cpp @@ -35,13 +35,16 @@ #include <string> #include <vector> -#include "mongo/client/connpool.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_runner.h" +#include "mongo/client/remote_command_targeter.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -102,27 +105,12 @@ namespace { invariant(_cs.isValid()); } - bool Shard::containsNode( const string& node ) const { - if (_cs.toString() == node) { - return true; - } - - if ( _cs.type() == ConnectionString::SET ) { - ReplicaSetMonitorPtr rs = ReplicaSetMonitor::get(_cs.getSetName()); - if (!rs) { - // Possibly still yet to be initialized. See SERVER-8194. - warning() << "Monitor not found for a known shard: " << _cs.getSetName(); - return false; - } - - return rs->contains(HostAndPort(node)); - } - - return false; + RemoteCommandTargeter* Shard::getTargeter() const { + return grid.shardRegistry()->getTargeterForShard(getId()).get(); } - bool Shard::isAShardNode( const string& ident ) { - return grid.shardRegistry()->isAShardNode( ident ); + RemoteCommandRunner* Shard::getCommandRunner() const { + return grid.shardRegistry()->getCommandRunner(); } ShardPtr Shard::lookupRSName(const string& name) { @@ -150,20 +138,30 @@ namespace { } bool Shard::runCommand(const string& db, const BSONObj& cmd, BSONObj& res) const { - ScopedDbConnection conn(getConnString()); - bool ok = conn->runCommand(db, cmd, res); - conn.done(); - return ok; + const ReadPreferenceSetting readPref(ReadPreference::PrimaryOnly, TagSet::primaryOnly()); + auto selectedHost = getTargeter()->findHost(readPref); + if (!selectedHost.isOK()) { + return false; + } + + const RemoteCommandRequest request(selectedHost.getValue(), db, cmd); + + auto statusCommand = getCommandRunner()->runCommand(request); + if (!statusCommand.isOK()) { + return false; + } + + res = statusCommand.getValue().data.getOwned(); + + return getStatusFromCommandResult(res).isOK(); } ShardStatus Shard::getStatus() const { - ScopedDbConnection conn(getConnString()); - BSONObj listDatabases; uassert(28589, str::stream() << "call to listDatabases on " << getConnString().toString() << " failed: " << listDatabases, - conn->runCommand("admin", BSON("listDatabases" << 1), listDatabases)); + runCommand("admin", BSON("listDatabases" << 1), listDatabases)); BSONElement totalSizeElem = listDatabases["totalSize"]; uassert(28590, "totalSize field not found in listDatabases", totalSizeElem.isNumber()); @@ -172,13 +170,11 @@ namespace { uassert(28591, str::stream() << "call to serverStatus on " << getConnString().toString() << " failed: " << serverStatus, - conn->runCommand("admin", BSON("serverStatus" << 1), serverStatus)); + runCommand("admin", BSON("serverStatus" << 1), serverStatus)); BSONElement versionElement = serverStatus["version"]; uassert(28599, "version field not found in serverStatus", versionElement.type() == String); - conn.done(); - return ShardStatus(totalSizeElem.numberLong(), versionElement.str()); } @@ -192,18 +188,22 @@ namespace { ShardPtr Shard::pick() { vector<ShardId> all; + grid.shardRegistry()->getAllShardIds(&all); - if ( all.size() == 0 ) { + if (all.size() == 0) { grid.shardRegistry()->reload(); grid.shardRegistry()->getAllShardIds(&all); - if ( all.size() == 0 ) + + if (all.empty()) { return nullptr; + } } auto bestShard = grid.shardRegistry()->findIfExists(all[0]); if (!bestShard) { return nullptr; } + ShardStatus bestStatus = bestShard->getStatus(); for (size_t i = 1; i < all.size(); i++) { @@ -211,6 +211,7 @@ namespace { if (!shard) { continue; } + const ShardStatus status = shard->getStatus(); if (status < bestStatus) { diff --git a/src/mongo/s/client/shard.h b/src/mongo/s/client/shard.h index 621a71c979d..6e44c00115e 100644 --- a/src/mongo/s/client/shard.h +++ b/src/mongo/s/client/shard.h @@ -37,6 +37,8 @@ namespace mongo { class BSONObj; + class RemoteCommandRunner; + class RemoteCommandTargeter; using ShardId = std::string; @@ -63,20 +65,29 @@ namespace mongo { using ShardPtr = boost::shared_ptr<Shard>; /* - * A "shard" one partition of the overall database (and a replica set typically). + * Maintains the targeting and command execution logic for a single shard. Performs polling of + * the shard (if replica set). */ class Shard { MONGO_DISALLOW_COPYING(Shard); public: - + /** + * Instantiates a new shard connection management object for the specified shard and + * connection string. + */ Shard(const ShardId& id, const ConnectionString& connStr, long long maxSizeMB, bool isDraining); const ShardId& getId() const { return _id; } + const ConnectionString& getConnString() const { return _cs; } + RemoteCommandTargeter* getTargeter() const; + + RemoteCommandRunner* getCommandRunner() const; + long long getMaxSizeMB() const { return _maxSizeMB; } @@ -107,8 +118,6 @@ namespace mongo { return _id < o._id; } - bool ok() const { return _cs.isValid(); } - BSONObj runCommand(const std::string& db, const std::string& simple) const; BSONObj runCommand(const std::string& db, const BSONObj& cmd) const; @@ -120,13 +129,6 @@ namespace mongo { */ ShardStatus getStatus() const; - /** - * mostly for replica set - * retursn true if node is the shard - * of if the replica set contains node - */ - bool containsNode( const std::string& node ) const; - static ShardPtr lookupRSName(const std::string& name); /** @@ -138,14 +140,13 @@ namespace mongo { static void reloadShardInfo(); static void removeShard(const ShardId& id); - - static bool isAShardNode( const std::string& ident ); static void installShard(const ShardId& id, const Shard& shard); private: - ShardId _id; - ConnectionString _cs; + const ShardId _id; + const ConnectionString _cs; + long long _maxSizeMB; // in MBytes, 0 is unlimited bool _isDraining; // shard is currently being removed }; diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 1234b30e274..6a5ed305f29 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -36,8 +36,13 @@ #include <boost/thread/lock_guard.hpp> #include "mongo/client/connection_string.h" +#include "mongo/client/remote_command_runner_impl.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/client/remote_command_targeter_factory.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -47,14 +52,29 @@ namespace mongo { using std::string; using std::vector; - - ShardRegistry::ShardRegistry(CatalogManager* catalogManager) - : _catalogManager(catalogManager) { + ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, + std::unique_ptr<RemoteCommandRunner> commandRunner, + CatalogManager* catalogManager) + : _targeterFactory(std::move(targeterFactory)), + _commandRunner(std::move(commandRunner)), + _catalogManager(catalogManager) { } ShardRegistry::~ShardRegistry() = default; + shared_ptr<RemoteCommandTargeter> ShardRegistry::getTargeterForShard(const string& shardId) { + auto targeter = _findTargeter(shardId); + if (targeter) { + return targeter; + } + + // If we can't find the shard, we might just need to reload the cache + reload(); + + return _findTargeter(shardId); + } + void ShardRegistry::reload() { vector<ShardType> shards; Status status = _catalogManager->getAllShards(&shards); @@ -67,6 +87,7 @@ namespace mongo { boost::lock_guard<boost::mutex> lk(_mutex); _lookup.clear(); + _targeters.clear(); _rsLookup.clear(); ShardType configServerShard; @@ -162,29 +183,6 @@ namespace mongo { all->assign(seen.begin(), seen.end()); } - bool ShardRegistry::isAShardNode(const string& addr) const { - boost::lock_guard<boost::mutex> lk(_mutex); - - // Check direct nods or set names - ShardMap::const_iterator i = _lookup.find(addr); - if (i != _lookup.end()) { - return true; - } - - // Check for set nodes - for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { - if (i->first == "config") { - continue; - } - - if (i->second->containsNode(addr)) { - return true; - } - } - - return false; - } - void ShardRegistry::toBSON(BSONObjBuilder* result) const { BSONObjBuilder b(_lookup.size() + 50); @@ -213,25 +211,43 @@ namespace mongo { shardHost, shardType.getMaxSize(), shardType.getDraining()); + _lookup[shardType.getName()] = shard; + + // Sync cluster connections (legacy config server) do not go through the normal targeting + // mechanism and must only be reachable through CatalogManagerLegacy or legacy-style + // queries and inserts. Do not create targeter for these connections. This code should go + // away after 3.2 is released. + if (shardHost.type() == ConnectionString::SYNC) { + return; + } + + // TODO: The only reason to have the shard host names in the lookup table is for the + // setShardVersion call, which resolves the shard id from the shard address. This is + // error-prone and will go away eventually when we switch all communications to go through + // the remote command runner. _lookup[shardType.getHost()] = shard; - if (shardHost.type() == ConnectionString::SET) { - if (shardHost.getSetName().size()) { - _rsLookup[shardHost.getSetName()] = shard; - } + for (const HostAndPort& hostAndPort : shardHost.getServers()) { + _lookup[hostAndPort.toString()] = shard; - vector<HostAndPort> servers = shardHost.getServers(); - for (unsigned i = 0; i < servers.size(); i++) { - _lookup[servers[i].toString()] = shard; + // Maintain a mapping from host to shard it belongs to for the case where we need to + // update the shard connection string on reconfigurations. + if (shardHost.type() == ConnectionString::SET) { + _rsLookup[hostAndPort.toString()] = shard; } } + + if (shardHost.type() == ConnectionString::SET) { + _rsLookup[shardHost.getSetName()] = shard; + } + + _targeters[shardType.getName()] = std::move(_targeterFactory->create(shardHost)); } - shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& id) { + shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& shardId) { boost::lock_guard<boost::mutex> lk(_mutex); - ShardMap::iterator it = _lookup.find(id); - + ShardMap::iterator it = _lookup.find(shardId); if (it != _lookup.end()) { return it->second; } @@ -239,4 +255,15 @@ namespace mongo { return nullptr; } + boost::shared_ptr<RemoteCommandTargeter> ShardRegistry::_findTargeter(const string& shardId) { + boost::lock_guard<boost::mutex> lk(_mutex); + + TargeterMap::iterator it = _targeters.find(shardId); + if (it != _targeters.end()) { + return it->second; + } + + return nullptr; + } + } // namespace mongo diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index a47b8160113..4e9d21d074c 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -39,6 +39,9 @@ namespace mongo { class BSONObjBuilder; class CatalogManager; + class RemoteCommandRunner; + class RemoteCommandTargeter; + class RemoteCommandTargeterFactory; class Shard; class ShardType; @@ -47,9 +50,16 @@ namespace mongo { */ class ShardRegistry { public: - ShardRegistry(CatalogManager* catalogManager); + ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, + std::unique_ptr<RemoteCommandRunner> commandRunner, + CatalogManager* catalogManager); + ~ShardRegistry(); + boost::shared_ptr<RemoteCommandTargeter> getTargeterForShard(const std::string& shardId); + + RemoteCommandRunner* getCommandRunner() const { return _commandRunner.get(); } + void reload(); boost::shared_ptr<Shard> findIfExists(const ShardId& id); @@ -67,20 +77,26 @@ namespace mongo { void getAllShardIds(std::vector<ShardId>* all) const; - bool isAShardNode(const std::string& addr) const; - void toBSON(BSONObjBuilder* result) const; private: typedef std::map<ShardId, boost::shared_ptr<Shard>> ShardMap; - + typedef std::map<ShardId, boost::shared_ptr<RemoteCommandTargeter>> TargeterMap; /** * Creates a shard based on the specified information and puts it into the lookup maps. */ void _addShard_inlock(const ShardType& shardType); - boost::shared_ptr<Shard> _findUsingLookUp(const ShardId& id); + boost::shared_ptr<Shard> _findUsingLookUp(const ShardId& shardId); + + boost::shared_ptr<RemoteCommandTargeter> _findTargeter(const std::string& shardId); + + // Factory to obtain remote command targeters for shards + const std::unique_ptr<RemoteCommandTargeterFactory> _targeterFactory; + + // API to run remote commands to shards + const std::unique_ptr<RemoteCommandRunner> _commandRunner; // Catalog manager from which to load the shard information. Not owned and must outlive // the shard registry object. @@ -92,7 +108,12 @@ namespace mongo { // Map of both shardName -> Shard and hostName -> Shard ShardMap _lookup; - // Map from ReplSet name to shard + // TODO: These should eventually disappear and become parts of Shard + + // Map of shard name to targeter for this shard + TargeterMap _targeters; + + // Map from all hosts within a replica set to the shard representing this replica set ShardMap _rsLookup; }; diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp index fd10878f4d3..cc0fc602fd4 100644 --- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp @@ -157,14 +157,8 @@ namespace { bb.append(temp.obj()); } - // obtain cached config shard + // Obtain the cached config shard const auto& configShard = grid.shardRegistry()->findIfExists("config"); - if (!configShard->ok()) { - return appendCommandStatus(result, - Status(ErrorCodes::ShardNotFound, - "Couldn't find shard " - "representing config server")); - } { // get config db from the config servers (first one) diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index 595d6e90d36..a39c6622e0a 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -33,13 +33,10 @@ #include "mongo/s/config.h" #include <boost/scoped_ptr.hpp> -#include <pcrecpp.h> #include "mongo/client/connpool.h" -#include "mongo/client/dbclientcursor.h" #include "mongo/db/client.h" #include "mongo/db/lasterror.h" -#include "mongo/db/server_options.h" #include "mongo/db/write_concern.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" @@ -51,26 +48,20 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_write.h" #include "mongo/s/grid.h" -#include "mongo/s/server.h" #include "mongo/s/type_locks.h" #include "mongo/s/type_lockpings.h" -#include "mongo/util/exit.h" #include "mongo/util/log.h" -#include "mongo/util/net/message.h" -#include "mongo/util/stringutils.h" namespace mongo { using boost::scoped_ptr; using std::auto_ptr; using std::endl; - using std::pair; using std::set; - using std::stringstream; + using std::string; using std::vector; CollectionInfo::CollectionInfo(const CollectionType& coll) { @@ -321,7 +312,7 @@ namespace mongo { // TODO: We need to keep this first one-chunk check in until we have a more efficient way of // creating/reusing a chunk manager, as doing so requires copying the full set of chunks currently - std::vector<ChunkType> newestChunk; + vector<ChunkType> newestChunk; if ( oldVersion.isSet() && ! forceReload ) { uassertStatusOK(grid.catalogManager()->getChunks( Query(BSON(ChunkType::ns(ns))) diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index 33a901baa2b..e5f42e2eb38 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -29,26 +29,22 @@ #pragma once #include <boost/shared_ptr.hpp> +#include <set> +#include "mongo/db/jsobj.h" #include "mongo/s/client/shard.h" -#include "mongo/s/shard_key_pattern.h" #include "mongo/util/concurrency/mutex.h" namespace mongo { class ChunkManager; class CollectionType; - class ConfigServer; class DatabaseType; class DBConfig; typedef boost::shared_ptr<DBConfig> DBConfigPtr; - extern ConfigServer& configServer; - - struct CollectionInfo { - CollectionInfo() { _dirty = false; _dropped = false; diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index 805380acaa4..a4b35c6e6c1 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -39,6 +39,8 @@ #include <vector> #include "mongo/client/connpool.h" +#include "mongo/client/remote_command_runner_impl.h" +#include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" @@ -54,6 +56,7 @@ #include "mongo/db/wire_version.h" #include "mongo/s/catalog/legacy/catalog_manager_legacy.h" #include "mongo/s/client/shard_connection.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_connection_hook.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" @@ -492,7 +495,12 @@ namespace mongo { auto catalogManager = stdx::make_unique<CatalogManagerLegacy>(); uassertStatusOK(catalogManager->init(configServerCS)); - grid.setCatalogManager(std::move(catalogManager)); + auto shardRegistry = stdx::make_unique<ShardRegistry>( + stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), + stdx::make_unique<RemoteCommandRunnerImpl>(0), + catalogManager.get()); + + grid.init(std::move(catalogManager), std::move(shardRegistry)); _enabled = true; } diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index d56c0b45f08..b24f195d356 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -33,40 +33,31 @@ #include "mongo/s/grid.h" #include "mongo/base/status_with.h" -#include "mongo/client/connpool.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_settings.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/config.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { - using boost::shared_ptr; - using std::endl; - using std::map; - using std::set; - using std::string; - using std::vector; - Grid::Grid() : _allowLocalShard(true) { } - void Grid::setCatalogManager(std::unique_ptr<CatalogManager> catalogManager) { + void Grid::init(std::unique_ptr<CatalogManager> catalogManager, + std::unique_ptr<ShardRegistry> shardRegistry) { + invariant(!_catalogManager); invariant(!_catalogCache); invariant(!_shardRegistry); _catalogManager = std::move(catalogManager); _catalogCache = stdx::make_unique<CatalogCache>(_catalogManager.get()); - _shardRegistry = stdx::make_unique<ShardRegistry>(_catalogManager.get()); + _shardRegistry = std::move(shardRegistry); } - StatusWith<shared_ptr<DBConfig>> Grid::implicitCreateDb(const std::string& dbName) { + StatusWith<boost::shared_ptr<DBConfig>> Grid::implicitCreateDb(const std::string& dbName) { auto status = catalogCache()->getDatabase(dbName); if (status.isOK()) { return status; @@ -126,5 +117,11 @@ namespace mongo { return shouldBalance(balSettings); } + void Grid::clearForUnitTests() { + _catalogManager.reset(); + _catalogCache.reset(); + _shardRegistry.reset(); + } + Grid grid; } diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index cbaf37a783c..e8dc9c2e7c3 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -54,10 +54,15 @@ namespace mongo { Grid(); /** - * Called at startup time so the catalog manager can be set. Should be called only once - * for the lifetime of the grid object. Takes ownership of the passed in pointer. + * Called at startup time so the global sharding services (catalog manager, shard registry) + * can be set. This method must be called once and once only for the lifetime of the + * service. + * + * NOTE: Unit-tests are allowed to call it more than once, provided they reset the object's + * state using clearForUnitTests. */ - void setCatalogManager(std::unique_ptr<CatalogManager> catalogManager); + void init(std::unique_ptr<CatalogManager> catalogManager, + std::unique_ptr<ShardRegistry> shardRegistry); /** * Implicitly creates the specified database as non-sharded. @@ -89,6 +94,15 @@ namespace mongo { CatalogCache* catalogCache() const { return _catalogCache.get(); } ShardRegistry* shardRegistry() const { return _shardRegistry.get(); } + /** + * Clears the grid object so that it can be reused between test executions. This will not + * be necessary if grid is hanging off the global ServiceContext and each test gets its + * own service context. + * + * NOTE: Do not use this outside of unit-tests. + */ + void clearForUnitTests(); + private: std::unique_ptr<CatalogManager> _catalogManager; std::unique_ptr<CatalogCache> _catalogCache; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index dfc2684f7d3..e7ee715aa95 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -34,13 +34,14 @@ #include <boost/shared_ptr.hpp> #include <boost/thread/thread.hpp> -#include <iostream> #include "mongo/base/init.h" #include "mongo/base/initializer.h" #include "mongo/base/status.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclient_rs.h" +#include "mongo/client/remote_command_runner_impl.h" +#include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" #include "mongo/db/audit.h" @@ -61,6 +62,7 @@ #include "mongo/s/balance.h" #include "mongo/s/catalog/legacy/catalog_manager_legacy.h" #include "mongo/s/client/sharding_connection_hook.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" @@ -250,7 +252,12 @@ static ExitCode runMongosServer( bool doUpgrade ) { } } - grid.setCatalogManager(std::move(catalogManager)); + auto shardRegistry = stdx::make_unique<ShardRegistry>( + stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), + stdx::make_unique<RemoteCommandRunnerImpl>(0), + catalogManager.get()); + + grid.init(std::move(catalogManager), std::move(shardRegistry)); ConfigServer::reloadSettings(); |