diff options
Diffstat (limited to 'src/mongo/s/client/shard_registry.cpp')
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 410 |
1 files changed, 200 insertions, 210 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 01efc45aaa4..ad33f35c661 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -48,281 +48,271 @@ namespace mongo { - using std::shared_ptr; - using std::string; - using std::unique_ptr; - using std::vector; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; - using executor::TaskExecutor; - using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs; +using executor::TaskExecutor; +using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs; namespace { - const Seconds kConfigCommandTimeout{30}; -} // unnamed namespace - - ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, - std::unique_ptr<RemoteCommandRunner> commandRunner, - std::unique_ptr<executor::TaskExecutor> executor, - CatalogManager* catalogManager) - : _targeterFactory(std::move(targeterFactory)), - _commandRunner(std::move(commandRunner)), - _executor(std::move(executor)), - _catalogManager(catalogManager) { - - // add config shard registry entry so know it's always there - std::lock_guard<std::mutex> lk(_mutex); - _addConfigShard_inlock(); +const Seconds kConfigCommandTimeout{30}; +} // unnamed namespace - } +ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, + std::unique_ptr<RemoteCommandRunner> commandRunner, + std::unique_ptr<executor::TaskExecutor> executor, + CatalogManager* catalogManager) + : _targeterFactory(std::move(targeterFactory)), + _commandRunner(std::move(commandRunner)), + _executor(std::move(executor)), + _catalogManager(catalogManager) { + // add config shard registry entry so know it's always there + std::lock_guard<std::mutex> lk(_mutex); + _addConfigShard_inlock(); +} - ShardRegistry::~ShardRegistry() = default; +ShardRegistry::~ShardRegistry() = default; - void ShardRegistry::reload() { - vector<ShardType> shards; - Status status = _catalogManager->getAllShards(&shards); - massert(13632, "couldn't get updated shard list from config server", status.isOK()); +void ShardRegistry::reload() { + vector<ShardType> shards; + Status status = _catalogManager->getAllShards(&shards); + massert(13632, "couldn't get updated shard list from config server", status.isOK()); - int numShards = shards.size(); + int numShards = shards.size(); - LOG(1) << "found " << numShards << " shards listed on config server(s)"; + LOG(1) << "found " << numShards << " shards listed on config server(s)"; - std::lock_guard<std::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); - _lookup.clear(); - _rsLookup.clear(); + _lookup.clear(); + _rsLookup.clear(); - _addConfigShard_inlock(); + _addConfigShard_inlock(); - for (const ShardType& shardType : shards) { - uassertStatusOK(shardType.validate()); + for (const ShardType& shardType : shards) { + uassertStatusOK(shardType.validate()); - // Skip the config host even if there is one left over from legacy installations. The - // config host is installed manually from the catalog manager data. - if (shardType.getName() == "config") { - continue; - } - - _addShard_inlock(shardType); + // Skip the config host even if there is one left over from legacy installations. The + // config host is installed manually from the catalog manager data. + if (shardType.getName() == "config") { + continue; } + + _addShard_inlock(shardType); } +} - shared_ptr<Shard> ShardRegistry::getShard(const ShardId& shardId) { - shared_ptr<Shard> shard = _findUsingLookUp(shardId); - if (shard) { - return shard; - } +shared_ptr<Shard> ShardRegistry::getShard(const ShardId& shardId) { + shared_ptr<Shard> shard = _findUsingLookUp(shardId); + if (shard) { + return shard; + } - // If we can't find the shard, we might just need to reload the cache - reload(); + // If we can't find the shard, we might just need to reload the cache + reload(); - return _findUsingLookUp(shardId); - } + return _findUsingLookUp(shardId); +} - shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const { - std::lock_guard<std::mutex> lk(_mutex); - ShardMap::const_iterator i = _rsLookup.find(name); +shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const { + std::lock_guard<std::mutex> lk(_mutex); + ShardMap::const_iterator i = _rsLookup.find(name); - return (i == _rsLookup.end()) ? nullptr : i->second; - } + return (i == _rsLookup.end()) ? nullptr : i->second; +} - void ShardRegistry::remove(const ShardId& id) { - std::lock_guard<std::mutex> lk(_mutex); +void ShardRegistry::remove(const ShardId& id) { + std::lock_guard<std::mutex> lk(_mutex); - for (ShardMap::iterator i = _lookup.begin(); i != _lookup.end();) { - shared_ptr<Shard> s = i->second; - if (s->getId() == id) { - _lookup.erase(i++); - } - else { - ++i; - } + for (ShardMap::iterator i = _lookup.begin(); i != _lookup.end();) { + shared_ptr<Shard> s = i->second; + if (s->getId() == id) { + _lookup.erase(i++); + } else { + ++i; } + } - for (ShardMap::iterator i = _rsLookup.begin(); i != _rsLookup.end();) { - shared_ptr<Shard> s = i->second; - if (s->getId() == id) { - _rsLookup.erase(i++); - } - else { - ++i; - } + for (ShardMap::iterator i = _rsLookup.begin(); i != _rsLookup.end();) { + shared_ptr<Shard> s = i->second; + if (s->getId() == id) { + _rsLookup.erase(i++); + } else { + ++i; } } +} - void ShardRegistry::getAllShardIds(vector<ShardId>* all) const { - std::set<string> seen; - - { - std::lock_guard<std::mutex> lk(_mutex); - for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { - const shared_ptr<Shard>& s = i->second; - if (s->getId() == "config") { - continue; - } - - if (seen.count(s->getId())) { - continue; - } - seen.insert(s->getId()); +void ShardRegistry::getAllShardIds(vector<ShardId>* all) const { + std::set<string> seen; + + { + std::lock_guard<std::mutex> lk(_mutex); + for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { + const shared_ptr<Shard>& s = i->second; + if (s->getId() == "config") { + continue; } - } - all->assign(seen.begin(), seen.end()); + if (seen.count(s->getId())) { + continue; + } + seen.insert(s->getId()); + } } - void ShardRegistry::toBSON(BSONObjBuilder* result) { - BSONObjBuilder b(_lookup.size() + 50); + all->assign(seen.begin(), seen.end()); +} - std::lock_guard<std::mutex> lk(_mutex); +void ShardRegistry::toBSON(BSONObjBuilder* result) { + BSONObjBuilder b(_lookup.size() + 50); - for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { - b.append(i->first, i->second->getConnString().toString()); - } + std::lock_guard<std::mutex> lk(_mutex); - result->append("map", b.obj()); + for (ShardMap::const_iterator i = _lookup.begin(); i != _lookup.end(); ++i) { + b.append(i->first, i->second->getConnString().toString()); } - void ShardRegistry::_addConfigShard_inlock() { - ShardType configServerShard; - configServerShard.setName("config"); - configServerShard.setHost(_catalogManager->connectionString().toString()); - _addShard_inlock(configServerShard); + result->append("map", b.obj()); +} + +void ShardRegistry::_addConfigShard_inlock() { + ShardType configServerShard; + configServerShard.setName("config"); + configServerShard.setHost(_catalogManager->connectionString().toString()); + _addShard_inlock(configServerShard); +} + +void ShardRegistry::_addShard_inlock(const ShardType& shardType) { + // This validation should ideally go inside the ShardType::validate call. However, doing + // it there would prevent us from loading previously faulty shard hosts, which might have + // been stored (i.e., the entire getAllShards call would fail). + auto shardHostStatus = ConnectionString::parse(shardType.getHost()); + if (!shardHostStatus.isOK()) { + warning() << "Unable to parse shard host " << shardHostStatus.getStatus().toString(); } - void ShardRegistry::_addShard_inlock(const ShardType& shardType) { - // This validation should ideally go inside the ShardType::validate call. However, doing - // it there would prevent us from loading previously faulty shard hosts, which might have - // been stored (i.e., the entire getAllShards call would fail). - auto shardHostStatus = ConnectionString::parse(shardType.getHost()); - if (!shardHostStatus.isOK()) { - warning() << "Unable to parse shard host " - << shardHostStatus.getStatus().toString(); - } - - const ConnectionString& shardHost(shardHostStatus.getValue()); - - // Sync cluster connections (legacy config server) do not go through the normal targeting - // mechanism and must only be reachable through CatalogManagerLegacy or legacy-style - // queries and inserts. Do not create targeter for these connections. This code should go - // away after 3.2 is released. - if (shardHost.type() == ConnectionString::SYNC) { - _lookup[shardType.getName()] = - std::make_shared<Shard>(shardType.getName(), shardHost, nullptr); - return; - } + const ConnectionString& shardHost(shardHostStatus.getValue()); - // Non-SYNC shards - shared_ptr<Shard> shard = - std::make_shared<Shard>(shardType.getName(), - shardHost, - std::move(_targeterFactory->create(shardHost))); + // Sync cluster connections (legacy config server) do not go through the normal targeting + // mechanism and must only be reachable through CatalogManagerLegacy or legacy-style + // queries and inserts. Do not create targeter for these connections. This code should go + // away after 3.2 is released. + if (shardHost.type() == ConnectionString::SYNC) { + _lookup[shardType.getName()] = + std::make_shared<Shard>(shardType.getName(), shardHost, nullptr); + return; + } - _lookup[shardType.getName()] = shard; + // Non-SYNC shards + shared_ptr<Shard> shard = std::make_shared<Shard>( + shardType.getName(), shardHost, std::move(_targeterFactory->create(shardHost))); - // TODO: The only reason to have the shard host names in the lookup table is for the - // setShardVersion call, which resolves the shard id from the shard address. This is - // error-prone and will go away eventually when we switch all communications to go through - // the remote command runner. - _lookup[shardType.getHost()] = shard; + _lookup[shardType.getName()] = shard; - for (const HostAndPort& hostAndPort : shardHost.getServers()) { - _lookup[hostAndPort.toString()] = shard; + // TODO: The only reason to have the shard host names in the lookup table is for the + // setShardVersion call, which resolves the shard id from the shard address. This is + // error-prone and will go away eventually when we switch all communications to go through + // the remote command runner. + _lookup[shardType.getHost()] = shard; - // Maintain a mapping from host to shard it belongs to for the case where we need to - // update the shard connection string on reconfigurations. - if (shardHost.type() == ConnectionString::SET) { - _rsLookup[hostAndPort.toString()] = shard; - } - } + for (const HostAndPort& hostAndPort : shardHost.getServers()) { + _lookup[hostAndPort.toString()] = shard; + // Maintain a mapping from host to shard it belongs to for the case where we need to + // update the shard connection string on reconfigurations. if (shardHost.type() == ConnectionString::SET) { - _rsLookup[shardHost.getSetName()] = shard; + _rsLookup[hostAndPort.toString()] = shard; } } - shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& shardId) { - std::lock_guard<std::mutex> lk(_mutex); - ShardMap::iterator it = _lookup.find(shardId); - if (it != _lookup.end()) { - return it->second; - } - - return nullptr; + if (shardHost.type() == ConnectionString::SET) { + _rsLookup[shardHost.getSetName()] = shard; } +} - StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host, - const NamespaceString& nss, - const BSONObj& query, - boost::optional<int> limit) { - // If for some reason the callback never gets invoked, we will return this status - Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); - vector<BSONObj> results; - - auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus, - Fetcher::NextAction* nextAction) { - - // Throw out any accumulated results on error - if (!dataStatus.isOK()) { - status = dataStatus.getStatus(); - results.clear(); - return; - } +shared_ptr<Shard> ShardRegistry::_findUsingLookUp(const ShardId& shardId) { + std::lock_guard<std::mutex> lk(_mutex); + ShardMap::iterator it = _lookup.find(shardId); + if (it != _lookup.end()) { + return it->second; + } - auto& data = dataStatus.getValue(); - for (const BSONObj& doc : data.documents) { - results.push_back(std::move(doc.getOwned())); - } + return nullptr; +} - status = Status::OK(); - }; +StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + boost::optional<int> limit) { + // If for some reason the callback never gets invoked, we will return this status + Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); + vector<BSONObj> results; - unique_ptr<LiteParsedQuery> findCmd( - fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit)))); + auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction) { - QueryFetcher fetcher(_executor.get(), - host, - nss, - findCmd->asFindCommand(), - fetcherCallback); + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + status = dataStatus.getStatus(); + results.clear(); + return; + } - Status scheduleStatus = fetcher.schedule(); - if (!scheduleStatus.isOK()) { - return scheduleStatus; + auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + results.push_back(std::move(doc.getOwned())); } - fetcher.wait(); + status = Status::OK(); + }; - if (!status.isOK()) { - return status; - } + unique_ptr<LiteParsedQuery> findCmd( + fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit)))); + + QueryFetcher fetcher(_executor.get(), host, nss, findCmd->asFindCommand(), fetcherCallback); - return results; + Status scheduleStatus = fetcher.schedule(); + if (!scheduleStatus.isOK()) { + return scheduleStatus; } - StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host, - const std::string& dbName, - const BSONObj& cmdObj) { - StatusWith<RemoteCommandResponse> responseStatus = - Status(ErrorCodes::InternalError, "Internal error running command"); + fetcher.wait(); - RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout); - auto callStatus = _executor->scheduleRemoteCommand(request, - [&responseStatus](const RemoteCommandCallbackArgs& args) { - responseStatus = args.response; - }); + if (!status.isOK()) { + return status; + } - if (!callStatus.isOK()) { - return callStatus.getStatus(); - } + return results; +} - // Block until the command is carried out - _executor->wait(callStatus.getValue()); +StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host, + const std::string& dbName, + const BSONObj& cmdObj) { + StatusWith<RemoteCommandResponse> responseStatus = + Status(ErrorCodes::InternalError, "Internal error running command"); - if (!responseStatus.isOK()) { - return responseStatus.getStatus(); - } + RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout); + auto callStatus = + _executor->scheduleRemoteCommand(request, + [&responseStatus](const RemoteCommandCallbackArgs& args) { + responseStatus = args.response; + }); + + if (!callStatus.isOK()) { + return callStatus.getStatus(); + } + + // Block until the command is carried out + _executor->wait(callStatus.getValue()); - return responseStatus.getValue().data; + if (!responseStatus.isOK()) { + return responseStatus.getStatus(); } -} // namespace mongo + return responseStatus.getValue().data; +} + +} // namespace mongo |