diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-04-06 14:39:35 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-04-06 18:14:13 -0400 |
commit | 1e28d93c599dcc963eb53986c2e8fb2bc4f8935c (patch) | |
tree | 6da5e1c962c0cb0091c132be08a689f25fb4ff75 /src/mongo/s | |
parent | b307c9eb7699b09c2cae2aa7235300c05abab5ed (diff) | |
download | mongo-1e28d93c599dcc963eb53986c2e8fb2bc4f8935c.tar.gz |
SERVER-28656 consolidate utility functions for forwarding commands to shards into cluster_commands_common.h
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/commands/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_common.cpp | 165 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_commands_common.h | 34 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_current_op.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_db_stats_cmd.cpp | 15 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_repair_database_cmd.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/commands/scatter_gather_from_shards.cpp | 166 | ||||
-rw-r--r-- | src/mongo/s/commands/scatter_gather_from_shards.h | 60 |
9 files changed, 208 insertions, 281 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index b7af6271a4e..81bae864fae 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -76,7 +76,6 @@ env.Library( 'cluster_write.cpp', 'cluster_write_cmd.cpp', 'commands_public.cpp', - 'scatter_gather_from_shards.cpp', 'sharded_command_processing.cpp', 'strategy.cpp', ], diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp index 1e0b13f8fcd..c154029131a 100644 --- a/src/mongo/s/commands/cluster_commands_common.cpp +++ b/src/mongo/s/commands/cluster_commands_common.cpp @@ -34,17 +34,182 @@ #include "mongo/db/commands.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/client/version_manager.h" +#include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" namespace mongo { +namespace { +BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version) { + BSONObjBuilder cmdWithVersionBob; + cmdWithVersionBob.appendElements(cmdObj); + version.appendForCommands(&cmdWithVersionBob); + return cmdWithVersionBob.obj(); +} +} + +std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx, + const BSONObj& cmdObj) { + std::vector<AsyncRequestsSender::Request> requests; + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + return requests; +} + +std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards( + OperationContext* opCtx, + const CachedCollectionRoutingInfo& routingInfo, + const BSONObj& cmdObj) { + std::vector<AsyncRequestsSender::Request> requests; + if (routingInfo.cm()) { + // The collection is sharded. Target all shards that own data for the collection. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (const ShardId& shardId : shardIds) { + requests.emplace_back( + shardId, appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + if (routingInfo.primary()->isConfig()) { + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo.primaryId(), cmdObj); + } else { + requests.emplace_back(routingInfo.primaryId(), + appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())); + } + } + return requests; +} + +using ShardAndReply = std::tuple<ShardId, BSONObj>; + +StatusWith<std::vector<ShardAndReply>> gatherResults( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + int options, + const std::vector<AsyncRequestsSender::Request>& requests, + BSONObjBuilder* output) { + // Extract the readPreference from the command. + rpc::ServerSelectionMetadata ssm; + BSONObjBuilder unusedCmdBob; + BSONObjBuilder upconvertedMetadataBob; + uassertStatusOK(rpc::ServerSelectionMetadata::upconvert( + cmdObj, options, &unusedCmdBob, &upconvertedMetadataBob)); + auto upconvertedMetadata = upconvertedMetadataBob.obj(); + auto ssmElem = upconvertedMetadata.getField(rpc::ServerSelectionMetadata::fieldName()); + if (!ssmElem.eoo()) { + ssm = uassertStatusOK(rpc::ServerSelectionMetadata::readFromMetadata(ssmElem)); + } + auto readPref = ssm.getReadPreference(); + + // Send the requests. + + AsyncRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + dbName, + requests, + readPref ? *readPref : ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet())); + + // Get the responses. + + std::vector<ShardAndReply> results; // Stores results by ShardId + BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString + BSONObjBuilder errors; // Stores errors by ConnectionString + int commonErrCode = -1; // Stores the overall error code + + BSONElement wcErrorElem; + ShardId wcErrorShardId; + bool hasWCError = false; + + while (!ars.done()) { + auto response = ars.next(); + const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId); + if (!swShard.isOK()) { + output->resetToEmpty(); + return swShard.getStatus(); + } + const auto shard = std::move(swShard.getValue()); + + auto status = response.swResponse.getStatus(); + if (status.isOK()) { + // We successfully received a response. + + status = getStatusFromCommandResult(response.swResponse.getValue().data); + if (ErrorCodes::isStaleShardingError(status.code())) { + // Do not report any raw results if we fail to establish a shardVersion. + output->resetToEmpty(); + return status; + } + + auto result = std::move(response.swResponse.getValue().data); + if (!hasWCError) { + if ((wcErrorElem = result["writeConcernError"])) { + wcErrorShardId = response.shardId; + hasWCError = true; + } + } + + if (status.isOK()) { + // The command status was OK. + subobj.append(shard->getConnString().toString(), result); + results.emplace_back(std::move(response.shardId), std::move(result)); + continue; + } + } + + // Either we failed to get a response, or the command had a non-OK status. + + // Convert the error status back into the format of a command result. + BSONObjBuilder resultBob; + Command::appendCommandStatus(resultBob, status); + auto result = resultBob.obj(); + + // Update the data structures that store the results. + errors.append(shard->getConnString().toString(), status.reason()); + if (commonErrCode == -1) { + commonErrCode = status.code(); + } else if (commonErrCode != status.code()) { + commonErrCode = 0; + } + subobj.append(shard->getConnString().toString(), result); + results.emplace_back(std::move(response.shardId), std::move(result)); + } + + subobj.done(); + + if (hasWCError) { + appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output); + } + + BSONObj errobj = errors.done(); + if (!errobj.isEmpty()) { + // If code for all errors is the same, then report the common error code. + if (commonErrCode > 0) { + return {ErrorCodes::fromInt(commonErrCode), errobj.toString()}; + } + return {ErrorCodes::OperationFailed, errobj.toString()}; + } + + return results; +} + int getUniqueCodeFromCommandResults(const std::vector<Strategy::CommandResult>& results) { int commonErrCode = -1; for (std::vector<Strategy::CommandResult>::const_iterator it = results.begin(); diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h index 87fd9b7d50c..e9f9cac0b71 100644 --- a/src/mongo/s/commands/cluster_commands_common.h +++ b/src/mongo/s/commands/cluster_commands_common.h @@ -32,7 +32,9 @@ #include <vector> #include "mongo/base/status.h" +#include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" +#include "mongo/s/async_requests_sender.h" #include "mongo/s/commands/strategy.h" #include "mongo/stdx/memory.h" @@ -43,6 +45,38 @@ class CachedDatabaseInfo; class OperationContext; /** + * Utility function to target all shards for a request that does not have a specific namespace. + */ +std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx, + const BSONObj& cmdObj); + +/** + * Utility function to get the set of shards to target for a request on a specific namespace. + * + * Selects shards to target based on 'routingInfo', and constructs a vector of requests, one per + * targeted shard, where the cmdObj to send to each shard has been modified to include the shard's + * shardVersion. + */ +std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards( + OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, const BSONObj& cmdObj); + +using ShardAndReply = std::tuple<ShardId, BSONObj>; + +/** + * Logic for commands that simply map out to all shards then fold the results into + * a single response. + * + * All shards are contacted in parallel. + */ +StatusWith<std::vector<ShardAndReply>> gatherResults( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + int options, + const std::vector<AsyncRequestsSender::Request>& requests, + BSONObjBuilder* output); + +/** * Utility function to compute a single error code from a vector of command results. * * @return If there is an error code common to all of the error results, returns that error diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index ecf1c1fb30b..b704a17a921 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -39,7 +39,7 @@ #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/scatter_gather_from_shards.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -88,20 +88,11 @@ public: int options, std::string& errmsg, BSONObjBuilder& output) override { - - // Target all shards. - std::vector<AsyncRequestsSender::Request> requests; - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - + auto requests = buildRequestsForAllShards(opCtx, cmdObj); auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); if (!swResults.isOK()) { return appendCommandStatus(output, swResults.getStatus()); } - aggregateResults(std::move(swResults.getValue()), output); return true; } diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp index 16151cf0f1c..ea83968a486 100644 --- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp @@ -30,9 +30,9 @@ #include <vector> -#include "mongo/s/commands/scatter_gather_from_shards.h" - +#include "mongo/db/commands.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" namespace mongo { @@ -71,20 +71,11 @@ public: int options, std::string& errmsg, BSONObjBuilder& output) override { - - // Target all shards. - std::vector<AsyncRequestsSender::Request> requests; - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - + auto requests = buildRequestsForAllShards(opCtx, cmdObj); auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); if (!swResults.isOK()) { return appendCommandStatus(output, swResults.getStatus()); } - aggregateResults(std::move(swResults.getValue()), output); return true; } diff --git a/src/mongo/s/commands/cluster_repair_database_cmd.cpp b/src/mongo/s/commands/cluster_repair_database_cmd.cpp index 707d6149846..bd13a094059 100644 --- a/src/mongo/s/commands/cluster_repair_database_cmd.cpp +++ b/src/mongo/s/commands/cluster_repair_database_cmd.cpp @@ -28,9 +28,9 @@ #include "mongo/platform/basic.h" -#include "mongo/s/commands/scatter_gather_from_shards.h" - +#include "mongo/db/commands.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/grid.h" namespace mongo { @@ -65,14 +65,7 @@ public: int options, std::string& errmsg, BSONObjBuilder& output) override { - // Target all shards. - std::vector<AsyncRequestsSender::Request> requests; - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - + auto requests = buildRequestsForAllShards(opCtx, cmdObj); auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); return appendCommandStatus(output, swResults.getStatus()); } diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 371782d2e4d..032c7a2512c 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -61,7 +61,6 @@ #include "mongo/s/commands/cluster_aggregate.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/cluster_explain.h" -#include "mongo/s/commands/scatter_gather_from_shards.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" #include "mongo/s/query/store_possible_cursor.h" @@ -253,28 +252,9 @@ protected: status = Status::OK(); output.resetToEmpty(); - // Target only shards that own data for the collection, and append the shardVersion for - // the collection to the command. - std::vector<AsyncRequestsSender::Request> requests; auto routingInfo = uassertStatusOK( Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - if (routingInfo.cm()) { - vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - for (const ShardId& shardId : shardIds) { - requests.emplace_back( - std::move(shardId), - appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))); - } - } else { - if (routingInfo.primary()->isConfig()) { - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo.primaryId(), cmdObj); - } else { - requests.emplace_back(routingInfo.primaryId(), - appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())); - } - } + auto requests = buildRequestsForTargetedShards(opCtx, routingInfo, cmdObj); auto swResults = gatherResults(opCtx, dbName, cmdObj, options, requests, &output); if (ErrorCodes::isStaleShardingError(swResults.getStatus().code())) { diff --git a/src/mongo/s/commands/scatter_gather_from_shards.cpp b/src/mongo/s/commands/scatter_gather_from_shards.cpp deleted file mode 100644 index ac331ceaf5d..00000000000 --- a/src/mongo/s/commands/scatter_gather_from_shards.cpp +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand - -#include "mongo/platform/basic.h" - -#include "mongo/s/commands/scatter_gather_from_shards.h" - -#include <list> -#include <set> - -#include "mongo/db/jsobj.h" -#include "mongo/executor/task_executor_pool.h" -#include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/rpc/metadata/server_selection_metadata.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/commands/sharded_command_processing.h" -#include "mongo/s/grid.h" -#include "mongo/util/log.h" - -namespace mongo { - -using ShardAndReply = std::tuple<ShardId, BSONObj>; - -StatusWith<std::vector<ShardAndReply>> gatherResults( - OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj, - int options, - const std::vector<AsyncRequestsSender::Request>& requests, - BSONObjBuilder* output) { - // Extract the readPreference from the command. - - rpc::ServerSelectionMetadata ssm; - BSONObjBuilder unusedCmdBob; - BSONObjBuilder upconvertedMetadataBob; - uassertStatusOK(rpc::ServerSelectionMetadata::upconvert( - cmdObj, options, &unusedCmdBob, &upconvertedMetadataBob)); - auto upconvertedMetadata = upconvertedMetadataBob.obj(); - auto ssmElem = upconvertedMetadata.getField(rpc::ServerSelectionMetadata::fieldName()); - if (!ssmElem.eoo()) { - ssm = uassertStatusOK(rpc::ServerSelectionMetadata::readFromMetadata(ssmElem)); - } - auto readPref = ssm.getReadPreference(); - - // Send the requests. - - AsyncRequestsSender ars( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - dbName, - requests, - readPref ? *readPref : ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet())); - - // Get the responses. - - std::vector<ShardAndReply> results; // Stores results by ShardId - BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString - BSONObjBuilder errors; // Stores errors by ConnectionString - int commonErrCode = -1; // Stores the overall error code - - BSONElement wcErrorElem; - ShardId wcErrorShardId; - bool hasWCError = false; - - while (!ars.done()) { - auto response = ars.next(); - const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId); - if (!swShard.isOK()) { - output->resetToEmpty(); - return swShard.getStatus(); - } - const auto shard = std::move(swShard.getValue()); - - auto status = response.swResponse.getStatus(); - if (status.isOK()) { - // We successfully received a response. - - status = getStatusFromCommandResult(response.swResponse.getValue().data); - if (ErrorCodes::isStaleShardingError(status.code())) { - // Do not report any raw results if we fail to establish a shardVersion. - output->resetToEmpty(); - return status; - } - - auto result = std::move(response.swResponse.getValue().data); - if (!hasWCError) { - if ((wcErrorElem = result["writeConcernError"])) { - wcErrorShardId = response.shardId; - hasWCError = true; - } - } - - if (status.isOK()) { - // The command status was OK. - subobj.append(shard->getConnString().toString(), result); - results.emplace_back(std::move(response.shardId), std::move(result)); - continue; - } - } - - // Either we failed to get a response, or the command had a non-OK status. - - // Convert the error status back into the format of a command result. - BSONObjBuilder resultBob; - Command::appendCommandStatus(resultBob, status); - auto result = resultBob.obj(); - - // Update the data structures that store the results. - errors.append(shard->getConnString().toString(), status.reason()); - if (commonErrCode == -1) { - commonErrCode = status.code(); - } else if (commonErrCode != status.code()) { - commonErrCode = 0; - } - subobj.append(shard->getConnString().toString(), result); - results.emplace_back(std::move(response.shardId), std::move(result)); - } - - subobj.done(); - - if (hasWCError) { - appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output); - } - - BSONObj errobj = errors.done(); - if (!errobj.isEmpty()) { - // If code for all errors is the same, then report the common error code. - if (commonErrCode > 0) { - return {ErrorCodes::fromInt(commonErrCode), errobj.toString()}; - } - return {ErrorCodes::OperationFailed, errobj.toString()}; - } - - return results; -} - -} // namespace mongo diff --git a/src/mongo/s/commands/scatter_gather_from_shards.h b/src/mongo/s/commands/scatter_gather_from_shards.h deleted file mode 100644 index ebca85f01a3..00000000000 --- a/src/mongo/s/commands/scatter_gather_from_shards.h +++ /dev/null @@ -1,60 +0,0 @@ -/** -* Copyright (C) 2015 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects -* for all of the code used other than as permitted herein. If you modify -* file(s) with this exception, you may extend this exception to your -* version of the file(s), but you are not obligated to do so. If you do not -* wish to do so, delete this exception statement from your version. If you -* delete this exception statement from all source files in the program, -* then also delete it in the license file. -*/ - -#include <string> -#include <tuple> -#include <vector> - -#include "mongo/base/string_data.h" -#include "mongo/db/commands.h" -#include "mongo/s/async_requests_sender.h" -#include "mongo/s/client/shard.h" - -namespace mongo { - -class BSONObj; -class BSONObjBuilder; -class OperationContext; - -using ShardAndReply = std::tuple<ShardId, BSONObj>; - -/** - * Logic for commands that simply map out to all shards then fold the results into - * a single response. - * - * All shards are contacted in parallel. - */ -StatusWith<std::vector<ShardAndReply>> gatherResults( - OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj, - int options, - const std::vector<AsyncRequestsSender::Request>& requests, - BSONObjBuilder* output); - -} // namespace mongo |