diff options
author | Randolph Tan <randolph@10gen.com> | 2015-06-17 14:54:48 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2015-06-19 11:28:19 -0400 |
commit | 1effdbaec08661985bd987f4ef96d82e049e0563 (patch) | |
tree | 99c86d3fd7af9055dcfd4ba75c429f8cba1b32ec /src/mongo | |
parent | 599289451b3108d3254e1efb3eb784eb072dc535 (diff) | |
download | mongo-1effdbaec08661985bd987f4ef96d82e049e0563.tar.gz |
SERVER-18589 Refactor find and runCommand from CatalogManagerRS
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/s/catalog/replset/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp | 146 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/catalog_manager_replica_set.h | 21 | ||||
-rw-r--r-- | src/mongo/s/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 84 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 25 |
6 files changed, 143 insertions, 137 deletions
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 15cadcc96d2..f8c7b7caeaf 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -33,9 +33,6 @@ env.Library( 'catalog_manager_replica_set.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/client/fetcher', - '$BUILD_DIR/mongo/db/query/lite_parsed_query', - '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/s/catalog/catalog_manager', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager', '$BUILD_DIR/mongo/s/client/sharding_client', diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp index 105d206f364..9cc35e05bcd 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp @@ -39,16 +39,14 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/dbclientinterface.h" -#include "mongo/client/query_fetcher.h" #include "mongo/client/read_preference.h" -#include "mongo/client/remote_command_runner.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" -#include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_settings.h" #include "mongo/s/catalog/type_shard.h" @@ -65,9 +63,6 @@ namespace mongo { - using executor::TaskExecutor; - using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs; - using std::set; using std::string; using std::unique_ptr; @@ -83,7 +78,6 @@ namespace { const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{}); const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{}); - const Seconds kConfigCommandTimeout{30}; const int kNotMasterNumRetries = 3; const Milliseconds kNotMasterRetryInterval{500}; @@ -184,10 +178,12 @@ namespace { return readHost.getStatus(); } - auto findStatus = _find(readHost.getValue(), - NamespaceString(DatabaseType::ConfigNS), - BSON(DatabaseType::name(dbName)), - 1); + auto findStatus = grid.shardRegistry()->exhaustiveFind( + readHost.getValue(), + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::name(dbName)), + 1); + if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -231,10 +227,12 @@ namespace { return readHostStatus.getStatus(); } - auto statusFind = _find(readHostStatus.getValue(), - NamespaceString(CollectionType::ConfigNS), - BSON(CollectionType::fullNs(collNs)), - 1); + auto statusFind = grid.shardRegistry()->exhaustiveFind( + readHostStatus.getValue(), + NamespaceString(CollectionType::ConfigNS), + BSON(CollectionType::fullNs(collNs)), + 1); + if (!statusFind.isOK()) { return statusFind.getStatus(); } @@ -276,10 +274,12 @@ namespace { return readHost.getStatus(); } - auto findStatus = _find(readHost.getValue(), - NamespaceString(SettingsType::ConfigNS), - BSON(SettingsType::key(key)), - 1); + auto findStatus = grid.shardRegistry()->exhaustiveFind( + readHost.getValue(), + NamespaceString(SettingsType::ConfigNS), + BSON(SettingsType::key(key)), + 1); + if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -323,10 +323,10 @@ namespace { return readHostStatus.getStatus(); } - auto findStatus = _find(readHostStatus.getValue(), - NamespaceString(ChunkType::ConfigNS), - query.obj, - boost::none); // no limit + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), + NamespaceString(ChunkType::ConfigNS), + query.obj, + boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -364,10 +364,10 @@ namespace { return readHost.getStatus(); } - auto findStatus = _find(readHost.getValue(), - NamespaceString(ShardType::ConfigNS), - BSONObj(), // no query filter - boost::none); // no limit + auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), + NamespaceString(ShardType::ConfigNS), + BSONObj(), // no query filter + boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -418,7 +418,7 @@ namespace { return Command::appendCommandStatus(*result, target.getStatus()); } - auto response = _runCommand(target.getValue(), dbname, cmdObj); + auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); if (!response.isOK()) { return Command::appendCommandStatus(*result, response.getStatus()); } @@ -448,7 +448,7 @@ namespace { return Command::appendCommandStatus(*result, target.getStatus()); } - auto resultStatus = _runCommand(target.getValue(), dbname, cmdObj); + auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); if (!resultStatus.isOK()) { return Command::appendCommandStatus(*result, resultStatus.getStatus()); } @@ -491,9 +491,10 @@ namespace { return; } - auto resultStatus = _runCommand(target.getValue(), - batchRequest.getNSS().db().toString(), - batchRequest.toBSON()); + auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), + batchRequest.getNSS().db().toString(), + batchRequest.toBSON()); + if (!resultStatus.isOK()) { _toBatchError(resultStatus.getStatus(), batchResponse); return; @@ -523,85 +524,4 @@ namespace { _toBatchError(notMasterStatus, batchResponse); } - StatusWith<vector<BSONObj>> CatalogManagerReplicaSet::_find(const HostAndPort& host, - const NamespaceString& nss, - const BSONObj& query, - boost::optional<int> limit) { - - // If for some reason the callback never gets invoked, we will return this status - Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); - vector<BSONObj> results; - - auto fetcherCallback = [&status, &results](const QueryFetcher::BatchDataStatus& dataStatus, - Fetcher::NextAction* nextAction) { - - // Throw out any accumulated results on error - if (!dataStatus.isOK()) { - status = dataStatus.getStatus(); - results.clear(); - return; - } - - auto& data = dataStatus.getValue(); - for (const BSONObj& doc : data.documents) { - results.push_back(std::move(doc.getOwned())); - } - - status = Status::OK(); - }; - - unique_ptr<LiteParsedQuery> findCmd( - fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit)))); - - QueryFetcher fetcher(grid.shardRegistry()->getExecutor(), - host, - nss, - findCmd->asFindCommand(), - fetcherCallback); - - Status scheduleStatus = fetcher.schedule(); - if (!scheduleStatus.isOK()) { - return scheduleStatus; - } - - fetcher.wait(); - - if (!status.isOK()) { - return status; - } - - return results; - } - - StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommand(const HostAndPort& host, - const std::string& dbName, - const BSONObj& cmdObj) { - - TaskExecutor* exec = grid.shardRegistry()->getExecutor(); - - StatusWith<RemoteCommandResponse> responseStatus = - Status(ErrorCodes::InternalError, "Internal error running command"); - - RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout); - auto callStatus = - exec->scheduleRemoteCommand(request, - [&responseStatus](const RemoteCommandCallbackArgs& args) { - - responseStatus = args.response; - }); - - if (!callStatus.isOK()) { - return callStatus.getStatus(); - } - - // Block until the command is carried out - exec->wait(callStatus.getValue()); - - if (!responseStatus.isOK()) { - return responseStatus.getStatus(); - } - - return responseStatus.getValue().data; - } - } // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h index b78b0a17943..9b85640d1fb 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h @@ -28,7 +28,6 @@ #pragma once -#include <boost/optional.hpp> #include <memory> #include <mutex> #include <string> @@ -40,9 +39,6 @@ namespace mongo { - struct HostAndPort; - class NamespaceString; - /** * Implements the catalog manager for talking to replica set config servers. */ @@ -140,23 +136,6 @@ namespace mongo { DistLockManager* getDistLockManager() override; private: - /** - * Executes 'find' command against the specified host and fetches *all* the results that - * the host will return until there are no more or until an error is returned. - * - * Returns either the complete set of results or an error, never partial results. - */ - StatusWith<std::vector<BSONObj>> _find(const HostAndPort& host, - const NamespaceString& nss, - const BSONObj& query, - boost::optional<int> limit); - - /** - * Runs a command against the specified host and returns the result. - */ - StatusWith<BSONObj> _runCommand(const HostAndPort& host, - const std::string& dbName, - const BSONObj& cmdObj); // Config server connection string ConnectionString _configServerConnectionString; diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript index f57d885b01e..8b414c1eecb 100644 --- a/src/mongo/s/client/SConscript +++ b/src/mongo/s/client/SConscript @@ -12,6 +12,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver', + '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/client/remote_command_runner_impl', '$BUILD_DIR/mongo/client/remote_command_targeter', '$BUILD_DIR/mongo/s/catalog/catalog_manager', diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 66a61633f79..ae0edd39146 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -33,6 +33,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/client/connection_string.h" +#include "mongo/client/query_fetcher.h" #include "mongo/client/remote_command_runner_impl.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/remote_command_targeter_factory.h" @@ -49,8 +50,16 @@ namespace mongo { using std::shared_ptr; using std::string; + using std::unique_ptr; using std::vector; + using executor::TaskExecutor; + using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs; + +namespace { + const Seconds kConfigCommandTimeout{30}; +} // unnamed namespace + ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory, std::unique_ptr<RemoteCommandRunner> commandRunner, std::unique_ptr<executor::TaskExecutor> executor, @@ -241,4 +250,79 @@ namespace mongo { return nullptr; } + StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + boost::optional<int> limit) { + // If for some reason the callback never gets invoked, we will return this status + Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); + vector<BSONObj> results; + + auto fetcherCallback = [&status, &results](const QueryFetcher::BatchDataStatus& dataStatus, + Fetcher::NextAction* nextAction) { + + // Throw out any accumulated results on error + if (!dataStatus.isOK()) { + status = dataStatus.getStatus(); + results.clear(); + return; + } + + auto& data = dataStatus.getValue(); + for (const BSONObj& doc : data.documents) { + results.push_back(std::move(doc.getOwned())); + } + + status = Status::OK(); + }; + + unique_ptr<LiteParsedQuery> findCmd( + fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit)))); + + QueryFetcher fetcher(_executor.get(), + host, + nss, + findCmd->asFindCommand(), + fetcherCallback); + + Status scheduleStatus = fetcher.schedule(); + if (!scheduleStatus.isOK()) { + return scheduleStatus; + } + + fetcher.wait(); + + if (!status.isOK()) { + return status; + } + + return results; + } + + StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host, + const std::string& dbName, + const BSONObj& cmdObj) { + StatusWith<RemoteCommandResponse> responseStatus = + Status(ErrorCodes::InternalError, "Internal error running command"); + + RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout); + auto callStatus = _executor->scheduleRemoteCommand(request, + [&responseStatus](const RemoteCommandCallbackArgs& args) { + responseStatus = args.response; + }); + + if (!callStatus.isOK()) { + return callStatus.getStatus(); + } + + // Block until the command is carried out + _executor->wait(callStatus.getValue()); + + if (!responseStatus.isOK()) { + return responseStatus.getStatus(); + } + + return responseStatus.getValue().data; + } + } // namespace mongo diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index d0daccd4d41..75b2af8bf72 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -28,6 +28,7 @@ #pragma once +#include <boost/optional.hpp> #include <memory> #include <mutex> #include <string> @@ -39,11 +40,15 @@ namespace mongo { class BSONObjBuilder; class CatalogManager; + class HostAndPort; + class NamespaceString; class RemoteCommandRunner; class RemoteCommandTargeterFactory; class Shard; class ShardType; + template<typename T> class StatusWith; + namespace executor { class TaskExecutor; @@ -95,6 +100,26 @@ namespace executor { void toBSON(BSONObjBuilder* result); + /** + * Executes 'find' command against the specified host and fetches *all* the results that + * the host will return until there are no more or until an error is returned. + * + * Returns either the complete set of results or an error, never partial results. + * + * Note: should never be used outside of CatalogManagerReplicaSet or DistLockCatalogImpl. + */ + StatusWith<std::vector<BSONObj>> exhaustiveFind(const HostAndPort& host, + const NamespaceString& nss, + const BSONObj& query, + boost::optional<int> limit); + + /** + * Runs a command against the specified host and returns the result. + */ + StatusWith<BSONObj> runCommand(const HostAndPort& host, + const std::string& dbName, + const BSONObj& cmdObj); + private: typedef std::map<ShardId, std::shared_ptr<Shard>> ShardMap; |