summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2015-06-17 14:54:48 -0400
committerRandolph Tan <randolph@10gen.com>2015-06-19 11:28:19 -0400
commit1effdbaec08661985bd987f4ef96d82e049e0563 (patch)
tree99c86d3fd7af9055dcfd4ba75c429f8cba1b32ec /src/mongo/s
parent599289451b3108d3254e1efb3eb784eb072dc535 (diff)
downloadmongo-1effdbaec08661985bd987f4ef96d82e049e0563.tar.gz
SERVER-18589 Refactor find and runCommand from CatalogManagerRS
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/catalog/replset/SConscript3
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp146
-rw-r--r--src/mongo/s/catalog/replset/catalog_manager_replica_set.h21
-rw-r--r--src/mongo/s/client/SConscript1
-rw-r--r--src/mongo/s/client/shard_registry.cpp84
-rw-r--r--src/mongo/s/client/shard_registry.h25
6 files changed, 143 insertions, 137 deletions
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 15cadcc96d2..f8c7b7caeaf 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -33,9 +33,6 @@ env.Library(
'catalog_manager_replica_set.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/client/fetcher',
- '$BUILD_DIR/mongo/db/query/lite_parsed_query',
- '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/s/catalog/catalog_manager',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager',
'$BUILD_DIR/mongo/s/client/sharding_client',
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
index 105d206f364..9cc35e05bcd 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.cpp
@@ -39,16 +39,14 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/dbclientinterface.h"
-#include "mongo/client/query_fetcher.h"
#include "mongo/client/read_preference.h"
-#include "mongo/client/remote_command_runner.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/executor/task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/dist_lock_manager.h"
#include "mongo/s/catalog/type_collection.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_database.h"
#include "mongo/s/catalog/type_settings.h"
#include "mongo/s/catalog/type_shard.h"
@@ -65,9 +63,6 @@
namespace mongo {
- using executor::TaskExecutor;
- using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
-
using std::set;
using std::string;
using std::unique_ptr;
@@ -83,7 +78,6 @@ namespace {
const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{});
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{});
- const Seconds kConfigCommandTimeout{30};
const int kNotMasterNumRetries = 3;
const Milliseconds kNotMasterRetryInterval{500};
@@ -184,10 +178,12 @@ namespace {
return readHost.getStatus();
}
- auto findStatus = _find(readHost.getValue(),
- NamespaceString(DatabaseType::ConfigNS),
- BSON(DatabaseType::name(dbName)),
- 1);
+ auto findStatus = grid.shardRegistry()->exhaustiveFind(
+ readHost.getValue(),
+ NamespaceString(DatabaseType::ConfigNS),
+ BSON(DatabaseType::name(dbName)),
+ 1);
+
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -231,10 +227,12 @@ namespace {
return readHostStatus.getStatus();
}
- auto statusFind = _find(readHostStatus.getValue(),
- NamespaceString(CollectionType::ConfigNS),
- BSON(CollectionType::fullNs(collNs)),
- 1);
+ auto statusFind = grid.shardRegistry()->exhaustiveFind(
+ readHostStatus.getValue(),
+ NamespaceString(CollectionType::ConfigNS),
+ BSON(CollectionType::fullNs(collNs)),
+ 1);
+
if (!statusFind.isOK()) {
return statusFind.getStatus();
}
@@ -276,10 +274,12 @@ namespace {
return readHost.getStatus();
}
- auto findStatus = _find(readHost.getValue(),
- NamespaceString(SettingsType::ConfigNS),
- BSON(SettingsType::key(key)),
- 1);
+ auto findStatus = grid.shardRegistry()->exhaustiveFind(
+ readHost.getValue(),
+ NamespaceString(SettingsType::ConfigNS),
+ BSON(SettingsType::key(key)),
+ 1);
+
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -323,10 +323,10 @@ namespace {
return readHostStatus.getStatus();
}
- auto findStatus = _find(readHostStatus.getValue(),
- NamespaceString(ChunkType::ConfigNS),
- query.obj,
- boost::none); // no limit
+ auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(),
+ NamespaceString(ChunkType::ConfigNS),
+ query.obj,
+ boost::none); // no limit
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -364,10 +364,10 @@ namespace {
return readHost.getStatus();
}
- auto findStatus = _find(readHost.getValue(),
- NamespaceString(ShardType::ConfigNS),
- BSONObj(), // no query filter
- boost::none); // no limit
+ auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(),
+ NamespaceString(ShardType::ConfigNS),
+ BSONObj(), // no query filter
+ boost::none); // no limit
if (!findStatus.isOK()) {
return findStatus.getStatus();
}
@@ -418,7 +418,7 @@ namespace {
return Command::appendCommandStatus(*result, target.getStatus());
}
- auto response = _runCommand(target.getValue(), dbname, cmdObj);
+ auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj);
if (!response.isOK()) {
return Command::appendCommandStatus(*result, response.getStatus());
}
@@ -448,7 +448,7 @@ namespace {
return Command::appendCommandStatus(*result, target.getStatus());
}
- auto resultStatus = _runCommand(target.getValue(), dbname, cmdObj);
+ auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj);
if (!resultStatus.isOK()) {
return Command::appendCommandStatus(*result, resultStatus.getStatus());
}
@@ -491,9 +491,10 @@ namespace {
return;
}
- auto resultStatus = _runCommand(target.getValue(),
- batchRequest.getNSS().db().toString(),
- batchRequest.toBSON());
+ auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(),
+ batchRequest.getNSS().db().toString(),
+ batchRequest.toBSON());
+
if (!resultStatus.isOK()) {
_toBatchError(resultStatus.getStatus(), batchResponse);
return;
@@ -523,85 +524,4 @@ namespace {
_toBatchError(notMasterStatus, batchResponse);
}
- StatusWith<vector<BSONObj>> CatalogManagerReplicaSet::_find(const HostAndPort& host,
- const NamespaceString& nss,
- const BSONObj& query,
- boost::optional<int> limit) {
-
- // If for some reason the callback never gets invoked, we will return this status
- Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
- vector<BSONObj> results;
-
- auto fetcherCallback = [&status, &results](const QueryFetcher::BatchDataStatus& dataStatus,
- Fetcher::NextAction* nextAction) {
-
- // Throw out any accumulated results on error
- if (!dataStatus.isOK()) {
- status = dataStatus.getStatus();
- results.clear();
- return;
- }
-
- auto& data = dataStatus.getValue();
- for (const BSONObj& doc : data.documents) {
- results.push_back(std::move(doc.getOwned()));
- }
-
- status = Status::OK();
- };
-
- unique_ptr<LiteParsedQuery> findCmd(
- fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit))));
-
- QueryFetcher fetcher(grid.shardRegistry()->getExecutor(),
- host,
- nss,
- findCmd->asFindCommand(),
- fetcherCallback);
-
- Status scheduleStatus = fetcher.schedule();
- if (!scheduleStatus.isOK()) {
- return scheduleStatus;
- }
-
- fetcher.wait();
-
- if (!status.isOK()) {
- return status;
- }
-
- return results;
- }
-
- StatusWith<BSONObj> CatalogManagerReplicaSet::_runCommand(const HostAndPort& host,
- const std::string& dbName,
- const BSONObj& cmdObj) {
-
- TaskExecutor* exec = grid.shardRegistry()->getExecutor();
-
- StatusWith<RemoteCommandResponse> responseStatus =
- Status(ErrorCodes::InternalError, "Internal error running command");
-
- RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout);
- auto callStatus =
- exec->scheduleRemoteCommand(request,
- [&responseStatus](const RemoteCommandCallbackArgs& args) {
-
- responseStatus = args.response;
- });
-
- if (!callStatus.isOK()) {
- return callStatus.getStatus();
- }
-
- // Block until the command is carried out
- exec->wait(callStatus.getValue());
-
- if (!responseStatus.isOK()) {
- return responseStatus.getStatus();
- }
-
- return responseStatus.getValue().data;
- }
-
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
index b78b0a17943..9b85640d1fb 100644
--- a/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
+++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set.h
@@ -28,7 +28,6 @@
#pragma once
-#include <boost/optional.hpp>
#include <memory>
#include <mutex>
#include <string>
@@ -40,9 +39,6 @@
namespace mongo {
- struct HostAndPort;
- class NamespaceString;
-
/**
* Implements the catalog manager for talking to replica set config servers.
*/
@@ -140,23 +136,6 @@ namespace mongo {
DistLockManager* getDistLockManager() override;
private:
- /**
- * Executes 'find' command against the specified host and fetches *all* the results that
- * the host will return until there are no more or until an error is returned.
- *
- * Returns either the complete set of results or an error, never partial results.
- */
- StatusWith<std::vector<BSONObj>> _find(const HostAndPort& host,
- const NamespaceString& nss,
- const BSONObj& query,
- boost::optional<int> limit);
-
- /**
- * Runs a command against the specified host and returns the result.
- */
- StatusWith<BSONObj> _runCommand(const HostAndPort& host,
- const std::string& dbName,
- const BSONObj& cmdObj);
// Config server connection string
ConnectionString _configServerConnectionString;
diff --git a/src/mongo/s/client/SConscript b/src/mongo/s/client/SConscript
index f57d885b01e..8b414c1eecb 100644
--- a/src/mongo/s/client/SConscript
+++ b/src/mongo/s/client/SConscript
@@ -12,6 +12,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver',
+ '$BUILD_DIR/mongo/client/fetcher',
'$BUILD_DIR/mongo/client/remote_command_runner_impl',
'$BUILD_DIR/mongo/client/remote_command_targeter',
'$BUILD_DIR/mongo/s/catalog/catalog_manager',
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 66a61633f79..ae0edd39146 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -33,6 +33,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/client/connection_string.h"
+#include "mongo/client/query_fetcher.h"
#include "mongo/client/remote_command_runner_impl.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/remote_command_targeter_factory.h"
@@ -49,8 +50,16 @@ namespace mongo {
using std::shared_ptr;
using std::string;
+ using std::unique_ptr;
using std::vector;
+ using executor::TaskExecutor;
+ using RemoteCommandCallbackArgs = TaskExecutor::RemoteCommandCallbackArgs;
+
+namespace {
+ const Seconds kConfigCommandTimeout{30};
+} // unnamed namespace
+
ShardRegistry::ShardRegistry(std::unique_ptr<RemoteCommandTargeterFactory> targeterFactory,
std::unique_ptr<RemoteCommandRunner> commandRunner,
std::unique_ptr<executor::TaskExecutor> executor,
@@ -241,4 +250,79 @@ namespace mongo {
return nullptr;
}
+ StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ boost::optional<int> limit) {
+ // If for some reason the callback never gets invoked, we will return this status
+ Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
+ vector<BSONObj> results;
+
+ auto fetcherCallback = [&status, &results](const QueryFetcher::BatchDataStatus& dataStatus,
+ Fetcher::NextAction* nextAction) {
+
+ // Throw out any accumulated results on error
+ if (!dataStatus.isOK()) {
+ status = dataStatus.getStatus();
+ results.clear();
+ return;
+ }
+
+ auto& data = dataStatus.getValue();
+ for (const BSONObj& doc : data.documents) {
+ results.push_back(std::move(doc.getOwned()));
+ }
+
+ status = Status::OK();
+ };
+
+ unique_ptr<LiteParsedQuery> findCmd(
+ fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, std::move(limit))));
+
+ QueryFetcher fetcher(_executor.get(),
+ host,
+ nss,
+ findCmd->asFindCommand(),
+ fetcherCallback);
+
+ Status scheduleStatus = fetcher.schedule();
+ if (!scheduleStatus.isOK()) {
+ return scheduleStatus;
+ }
+
+ fetcher.wait();
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return results;
+ }
+
+ StatusWith<BSONObj> ShardRegistry::runCommand(const HostAndPort& host,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ StatusWith<RemoteCommandResponse> responseStatus =
+ Status(ErrorCodes::InternalError, "Internal error running command");
+
+ RemoteCommandRequest request(host, dbName, cmdObj, kConfigCommandTimeout);
+ auto callStatus = _executor->scheduleRemoteCommand(request,
+ [&responseStatus](const RemoteCommandCallbackArgs& args) {
+ responseStatus = args.response;
+ });
+
+ if (!callStatus.isOK()) {
+ return callStatus.getStatus();
+ }
+
+ // Block until the command is carried out
+ _executor->wait(callStatus.getValue());
+
+ if (!responseStatus.isOK()) {
+ return responseStatus.getStatus();
+ }
+
+ return responseStatus.getValue().data;
+ }
+
} // namespace mongo
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index d0daccd4d41..75b2af8bf72 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -28,6 +28,7 @@
#pragma once
+#include <boost/optional.hpp>
#include <memory>
#include <mutex>
#include <string>
@@ -39,11 +40,15 @@ namespace mongo {
class BSONObjBuilder;
class CatalogManager;
+ class HostAndPort;
+ class NamespaceString;
class RemoteCommandRunner;
class RemoteCommandTargeterFactory;
class Shard;
class ShardType;
+ template<typename T> class StatusWith;
+
namespace executor {
class TaskExecutor;
@@ -95,6 +100,26 @@ namespace executor {
void toBSON(BSONObjBuilder* result);
+ /**
+ * Executes 'find' command against the specified host and fetches *all* the results that
+ * the host will return until there are no more or until an error is returned.
+ *
+ * Returns either the complete set of results or an error, never partial results.
+ *
+ * Note: should never be used outside of CatalogManagerReplicaSet or DistLockCatalogImpl.
+ */
+ StatusWith<std::vector<BSONObj>> exhaustiveFind(const HostAndPort& host,
+ const NamespaceString& nss,
+ const BSONObj& query,
+ boost::optional<int> limit);
+
+ /**
+ * Runs a command against the specified host and returns the result.
+ */
+ StatusWith<BSONObj> runCommand(const HostAndPort& host,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
+
private:
typedef std::map<ShardId, std::shared_ptr<Shard>> ShardMap;