summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/shard_registry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/shard_registry.cpp')
-rw-r--r--src/mongo/s/client/shard_registry.cpp410
1 files changed, 200 insertions, 210 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 01efc45aaa4..ad33f35c661 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -48,281 +48,271 @@
namespace mongo {
- using std::shared_ptr;
- using std::string;
- using std::unique_ptr;
- using std::vector;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
- using executor::TaskExecutor;
- using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
+using executor::TaskExecutor;
+using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
namespace {
- const Seconds kConfigCommandTimeout{30};
-} // unnamed namespace
-
- ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory,
- std::unique_ptr<RemoteCommandRunner> commandRunner,
- std::unique_ptr<executor::TaskExecutor> executor,
- CatalogManager* catalogManager)
- : _targeterFactory(std::move(targeterFactory)),
- _commandRunner(std::move(commandRunner)),
- _executor(std::move(executor)),
- _catalogManager(catalogManager) {
-
- // add config shard registry entry so know it's always there
- std::lock_guard<std::mutex> lk(_mutex);
- _addConfigShard_inlock();
+const Seconds kConfigCommandTimeout{30};
+} // unnamed namespace
- }
+ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory,
+ std::unique_ptr<RemoteCommandRunner> commandRunner,
+ std::unique_ptr<executor::TaskExecutor> executor,
+ CatalogManager* catalogManager)
+ : _targeterFactory(std::move(targeterFactory)),
+ _commandRunner(std::move(commandRunner)),
+ _executor(std::move(executor)),
+ _catalogManager(catalogManager) {
+ // add config shard registry entry so know it's always there
+ std::lock_guard<std::mutex> lk(_mutex);
+ _addConfigShard_inlock();
+}
- ShardRegistry::~ShardRegistry() = default;
+ShardRegistry::~ShardRegistry() = default;
- void ShardRegistry::reload() {
- vector<ShardType> shards;
- Status status = _catalogManager->getAllShards(&shards);
- massert(13632, "couldn't get updated shard list from config server", status.isOK());
+void ShardRegistry::reload() {
+ vector<ShardType> shards;
+ Status status = _catalogManager->getAllShards(&shards);
+ massert(13632, "couldn't get updated shard list from config server", status.isOK());
- int numShards = shards.size();
+ int numShards = shards.size();
- LOG(1) << "found " << numShards << " shards listed on config server(s)";
+ LOG(1) << "found " << numShards << " shards listed on config server(s)";
- std::lock_guard<std::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
- _lookup.clear();
- _rsLookup.clear();
+ _lookup.clear();
+ _rsLookup.clear();
- _addConfigShard_inlock();
+ _addConfigShard_inlock();
- for (const ShardType& shardType : shards) {
- uassertStatusOK(shardType.validate());
+ for (const ShardType& shardType : shards) {
+ uassertStatusOK(shardType.validate());
- // Skip the config host even if there is one left over from legacy installations. The
- // config host is installed manually from the catalog manager data.
- if (shardType.getName() == "config") {
- continue;
- }
-
- _addShard_inlock(shardType);
+ // Skip the config host even if there is one left over from legacy installations. The
+ // config host is installed manually from the catalog manager data.
+ if (shardType.getName() == "config") {
+ continue;
}
+
+ _addShard_inlock(shardType);
}
+}
- shared_ptr<Shard> ShardRegistry::getShard(const ShardId& shardId) {
- shared_ptr<Shard> shard = _findUsingLookUp(shardId);
- if (shard) {
- return shard;
- }
+shared_ptr<Shard> ShardRegistry::getShard(const ShardId& shardId) {
+ shared_ptr<Shard> shard = _findUsingLookUp(shardId);
+ if (shard) {
+ return shard;
+ }
- // If we can't find the shard, we might just need to reload the cache
- reload();
+ // If we can't find the shard, we might just need to reload the cache
+ reload();
- return _findUsingLookUp(shardId);
- }
+ return _findUsingLookUp(shardId);
+}
- shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const {
- std::lock_guard<std::mutex> lk(_mutex);
- ShardMap::const_iterator i = _rsLookup.find(name);
+shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const {
+ std::lock_guard<std::mutex> lk(_mutex);
+ ShardMap::const_iterator i = _rsLookup.find(name);
- return (i == _rsLookup.end()) ? nullptr : i->second;
- }
+ return (i == _rsLookup.end()) ? nullptr : i->second;
+}
- void ShardRegistry::remove(const ShardId& id) {
- std::lock_guard<std::mutex> lk(_mutex);
+void ShardRegistry::remove(const ShardId& id) {
+ std::lock_guard<std::mutex> lk(_mutex);
- for (ShardMap::iterator i = _lookup.begin(); i != _lookup.end();) {
- shared_ptr<Shard> s = i->second;
- if (s->getId() == id) {
- _lookup.erase(i++);
- }
- else {
- ++i;
- }
+ for (ShardMap::iterator i = _lookup.begin(); i != _lookup.end();) {
+ shared_ptr<Shard> s = i->second;
+ if (s->getId() == id) {
+ _lookup.erase(i++);
+ } else {
+ ++i;
}
+ }
- for (ShardMap::iterator i = _rsLookup.begin(); i != _rsLookup.end();) {
- shared_ptr<Shard> s = i->second;
- if (s->getId() == id) {
- _rsLookup.erase(i++);
- }
- else {
- ++i;
- }
+ for (ShardMap::iterator i = _rsLookup.begin(); i != _rsLookup.end();) {
+ shared_ptr<Shard> s = i->second;
+ if (s->getId() == id) {
+ _rsLookup.erase(i++);
+ } else {
+ ++i;
}
}
+}
- void ShardRegistry::getAllShardIds(vector<ShardId>* all) const {
- std::set<string> seen;
-
- {
- std::lock_guard<std::mutex> lk(_mutex);
- for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) {
- const shared_ptr<Shard>& s = i->second;
- if (s->getId() == "config") {
- continue;
- }
-
- if (seen.count(s->getId())) {
- continue;
- }
- seen.insert(s->getId());
+void ShardRegistry::getAllShardIds(vector<ShardId>* all) const {
+ std::set<string> seen;
+
+ {
+ std::lock_guard<std::mutex> lk(_mutex);
+ for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) {
+ const shared_ptr<Shard>& s = i->second;
+ if (s->getId() == "config") {
+ continue;
}
- }
- all->assign(seen.begin(), seen.end());
+ if (seen.count(s->getId())) {
+ continue;
+ }
+ seen.insert(s->getId());
+ }
}
- void ShardRegistry::toBSON(BSONObjBuilder* result) {
- BSONObjBuilder b(_lookup.size() + 50);
+ all->assign(seen.begin(), seen.end());
+}
- std::lock_guard<std::mutex> lk(_mutex);
+void ShardRegistry::toBSON(BSONObjBuilder* result) {
+ BSONObjBuilder b(_lookup.size() + 50);
- for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) {
- b.append(i->first, i->second->getConnString().toString());
- }
+ std::lock_guard<std::mutex> lk(_mutex);
- result->append("map", b.obj());
+ for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) {
+ b.append(i->first, i->second->getConnString().toString());
}
- void ShardRegistry::_addConfigShard_inlock() {
- ShardType configServerShard;
- configServerShard.setName("config");
- configServerShard.setHost(_catalogManager->connectionString().toString());
- _addShard_inlock(configServerShard);
+ result->append("map", b.obj());
+}
+
+void ShardRegistry::_addConfigShard_inlock() {
+ ShardType configServerShard;
+ configServerShard.setName("config");
+ configServerShard.setHost(_catalogManager->connectionString().toString());
+ _addShard_inlock(configServerShard);
+}
+
+void ShardRegistry::_addShard_inlock(const ShardType& shardType) {
+ // This validation should ideally go inside the ShardType::validate call. However, doing
+ // it there would prevent us from loading previously faulty shard hosts, which might have
+ // been stored (i.e., the entire getAllShards call would fail).
+ auto shardHostStatus = ConnectionString::parse(shardType.getHost());
+ if (!shardHostStatus.isOK()) {
+ warning() << "Unable to parse shard host " << shardHostStatus.getStatus().toString();
}
- void ShardRegistry::_addShard_inlock(const ShardType& shardType) {
- // This validation should ideally go inside the ShardType::validate call. However, doing
- // it there would prevent us from loading previously faulty shard hosts, which might have
- // been stored (i.e., the entire getAllShards call would fail).
- auto shardHostStatus = ConnectionString::parse(shardType.getHost());
- if (!shardHostStatus.isOK()) {
- warning() << "Unable to parse shard host "
- << shardHostStatus.getStatus().toString();
- }
-
- const ConnectionString& shardHost(shardHostStatus.getValue());
-
- // Sync cluster connections (legacy config server) do not go through the normal targeting
- // mechanism and must only be reachable through CatalogManagerLegacy or legacy-style
- // queries and inserts. Do not create targeter for these connections. This code should go
- // away after 3.2 is released.
- if (shardHost.type() == ConnectionString::SYNC) {
- _lookup[shardType.getName()] =
- std::make_shared<Shard>(shardType.getName(), shardHost, nullptr);
- return;
- }
+ const ConnectionString& shardHost(shardHostStatus.getValue());
- // Non-SYNC shards
- shared_ptr<Shard> shard =
- std::make_shared<Shard>(shardType.getName(),
- shardHost,
- std::move(_targeterFactory->create(shardHost)));
+ // Sync cluster connections (legacy config server) do not go through the normal targeting
+ // mechanism and must only be reachable through CatalogManagerLegacy or legacy-style
+ // queries and inserts. Do not create targeter for these connections. This code should go
+ // away after 3.2 is released.
+ if (shardHost.type() == ConnectionString::SYNC) {
+ _lookup[shardType.getName()] =
+ std::make_shared<Shard>(shardType.getName(), shardHost, nullptr);
+ return;
+ }
- _lookup[shardType.getName()] = shard;
+ // Non-SYNC shards
+ shared_ptr<Shard> shard = std::make_shared<Shard>(
+ shardType.getName(), shardHost, std::move(_targeterFactory->create(shardHost)));
- // TODO: The only reason to have the shard host names in the lookup table is for the
- // setShardVersion call, which resolves the shard id from the shard address. This is
- // error-prone and will go away eventually when we switch all communications to go through
- // the remote command runner.
- _lookup[shardType.getHost()] = shard;
+ _lookup[shardType.getName()] = shard;
- for (const HostAndPort& hostAndPort : shardHost.getServers()) {
- _lookup[hostAndPort.toString()] = shard;
+ // TODO: The only reason to have the shard host names in the lookup table is for the
+ // setShardVersion call, which resolves the shard id from the shard address. This is
+ // error-prone and will go away eventually when we switch all communications to go through
+ // the remote command runner.
+ _lookup[shardType.getHost()] = shard;
- // Maintain a mapping from host to shard it belongs to for the case where we need to
- // update the shard connection string on reconfigurations.
- if (shardHost.type() == ConnectionString::SET) {
- _rsLookup[hostAndPort.toString()] = shard;
- }
- }
+ for (const HostAndPort& hostAndPort : shardHost.getServers()) {
+ _lookup[hostAndPort.toString()] = shard;
+ // Maintain a mapping from host to shard it belongs to for the case where we need to
+ // update the shard connection string on reconfigurations.
if (shardHost.type() == ConnectionString::SET) {
- _rsLookup[shardHost.getSetName()] = shard;
+ _rsLookup[hostAndPort.toString()] = shard;
}
}
- shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& shardId) {
- std::lock_guard<std::mutex> lk(_mutex);
- ShardMap::iterator it = _lookup.find(shardId);
- if (it != _lookup.end()) {
- return it->second;
- }
-
- return nullptr;
+ if (shardHost.type() == ConnectionString::SET) {
+ _rsLookup[shardHost.getSetName()] = shard;
}
+}
- StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host,
- const NamespaceString& nss,
- const BSONObj& query,
- boost::optional<int> limit) {
- // If for some reason the callback never gets invoked, we will return this status
- Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
- vector<BSONObj> results;
-
- auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus,
- Fetcher::NextAction* nextAction) {
-
- // Throw out any accumulated results on error
- if (!dataStatus.isOK()) {
- status = dataStatus.getStatus();
- results.clear();
- return;
- }
+shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& shardId) {
+ std::lock_guard<std::mutex> lk(_mutex);
+ ShardMap::iterator it = _lookup.find(shardId);
+ if (it != _lookup.end()) {
+ return it->second;
+ }
- auto& data = dataStatus.getValue();
- for (const BSONObj& doc : data.documents) {
- results.push_back(std::move(doc.getOwned()));
- }
+ return nullptr;
+}
- status = Status::OK();
- };
+StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ boost::optional<int> limit) {
+ // If for some reason the callback never gets invoked, we will return this status
+ Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
+ vector<BSONObj> results;
- unique_ptr<LiteParsedQuery> findCmd(
- fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit))));
+ auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction) {
- QueryFetcher fetcher(_executor.get(),
- host,
- nss,
- findCmd->asFindCommand(),
- fetcherCallback);
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ status = dataStatus.getStatus();
+ results.clear();
+ return;
+ }
- Status scheduleStatus = fetcher.schedule();
- if (!scheduleStatus.isOK()) {
- return scheduleStatus;
+ auto& data = dataStatus.getValue();
+ for (const BSONObj& doc : data.documents) {
+ results.push_back(std::move(doc.getOwned()));
}
- fetcher.wait();
+ status = Status::OK();
+ };
- if (!status.isOK()) {
- return status;
- }
+ unique_ptr<LiteParsedQuery> findCmd(
+ fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit))));
+
+ QueryFetcher fetcher(_executor.get(), host, nss, findCmd->asFindCommand(), fetcherCallback);
- return results;
+ Status scheduleStatus = fetcher.schedule();
+ if (!scheduleStatus.isOK()) {
+ return scheduleStatus;
}
- StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host,
- const std::string& dbName,
- const BSONObj& cmdObj) {
- StatusWith<RemoteCommandResponse> responseStatus =
- Status(ErrorCodes::InternalError, "Internal error running command");
+ fetcher.wait();
- RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout);
- auto callStatus = _executor->scheduleRemoteCommand(request,
- [&responseStatus](const RemoteCommandCallbackArgs& args) {
- responseStatus = args.response;
- });
+ if (!status.isOK()) {
+ return status;
+ }
- if (!callStatus.isOK()) {
- return callStatus.getStatus();
- }
+ return results;
+}
- // Block until the command is carried out
- _executor->wait(callStatus.getValue());
+StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ StatusWith<RemoteCommandResponse> responseStatus =
+ Status(ErrorCodes::InternalError, "Internal error running command");
- if (!responseStatus.isOK()) {
- return responseStatus.getStatus();
- }
+ RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout);
+ auto callStatus =
+ _executor->scheduleRemoteCommand(request,
+ [&responseStatus](const RemoteCommandCallbackArgs& args) {
+ responseStatus = args.response;
+ });
+
+ if (!callStatus.isOK()) {
+ return callStatus.getStatus();
+ }
+
+ // Block until the command is carried out
+ _executor->wait(callStatus.getValue());
- return responseStatus.getValue().data;
+ if (!responseStatus.isOK()) {
+ return responseStatus.getStatus();
}
-} // namespace mongo
+ return responseStatus.getValue().data;
+}
+
+} // namespace mongo