summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_commands_common.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_commands_common.cpp')
-rw-r--r--src/mongo/s/commands/cluster_commands_common.cpp165
1 files changed, 165 insertions, 0 deletions
diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp
index 1e0b13f8fcd..c154029131a 100644
--- a/src/mongo/s/commands/cluster_commands_common.cpp
+++ b/src/mongo/s/commands/cluster_commands_common.cpp
@@ -34,17 +34,182 @@
#include "mongo/db/commands.h"
#include "mongo/db/query/cursor_response.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/parallel.h"
#include "mongo/s/client/shard_connection.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/version_manager.h"
+#include "mongo/s/commands/sharded_command_processing.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/log.h"
namespace mongo {
+namespace {
+BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version) {
+ BSONObjBuilder cmdWithVersionBob;
+ cmdWithVersionBob.appendElements(cmdObj);
+ version.appendForCommands(&cmdWithVersionBob);
+ return cmdWithVersionBob.obj();
+}
+}
+
+std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx,
+ const BSONObj& cmdObj) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ return requests;
+}
+
+std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards(
+ OperationContext* opCtx,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj& cmdObj) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ if (routingInfo.cm()) {
+ // The collection is sharded. Target all shards that own data for the collection.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ for (const ShardId& shardId : shardIds) {
+ requests.emplace_back(
+ shardId, appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId)));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ if (routingInfo.primary()->isConfig()) {
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo.primaryId(), cmdObj);
+ } else {
+ requests.emplace_back(routingInfo.primaryId(),
+ appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()));
+ }
+ }
+ return requests;
+}
+
+using ShardAndReply = std::tuple<ShardId, BSONObj>;
+
+StatusWith<std::vector<ShardAndReply>> gatherResults(
+ OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ int options,
+ const std::vector<AsyncRequestsSender::Request>& requests,
+ BSONObjBuilder* output) {
+ // Extract the readPreference from the command.
+ rpc::ServerSelectionMetadata ssm;
+ BSONObjBuilder unusedCmdBob;
+ BSONObjBuilder upconvertedMetadataBob;
+ uassertStatusOK(rpc::ServerSelectionMetadata::upconvert(
+ cmdObj, options, &unusedCmdBob, &upconvertedMetadataBob));
+ auto upconvertedMetadata = upconvertedMetadataBob.obj();
+ auto ssmElem = upconvertedMetadata.getField(rpc::ServerSelectionMetadata::fieldName());
+ if (!ssmElem.eoo()) {
+ ssm = uassertStatusOK(rpc::ServerSelectionMetadata::readFromMetadata(ssmElem));
+ }
+ auto readPref = ssm.getReadPreference();
+
+ // Send the requests.
+
+ AsyncRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ dbName,
+ requests,
+ readPref ? *readPref : ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet()));
+
+ // Get the responses.
+
+ std::vector<ShardAndReply> results; // Stores results by ShardId
+ BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString
+ BSONObjBuilder errors; // Stores errors by ConnectionString
+ int commonErrCode = -1; // Stores the overall error code
+
+ BSONElement wcErrorElem;
+ ShardId wcErrorShardId;
+ bool hasWCError = false;
+
+ while (!ars.done()) {
+ auto response = ars.next();
+ const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId);
+ if (!swShard.isOK()) {
+ output->resetToEmpty();
+ return swShard.getStatus();
+ }
+ const auto shard = std::move(swShard.getValue());
+
+ auto status = response.swResponse.getStatus();
+ if (status.isOK()) {
+ // We successfully received a response.
+
+ status = getStatusFromCommandResult(response.swResponse.getValue().data);
+ if (ErrorCodes::isStaleShardingError(status.code())) {
+ // Do not report any raw results if we fail to establish a shardVersion.
+ output->resetToEmpty();
+ return status;
+ }
+
+ auto result = std::move(response.swResponse.getValue().data);
+ if (!hasWCError) {
+ if ((wcErrorElem = result["writeConcernError"])) {
+ wcErrorShardId = response.shardId;
+ hasWCError = true;
+ }
+ }
+
+ if (status.isOK()) {
+ // The command status was OK.
+ subobj.append(shard->getConnString().toString(), result);
+ results.emplace_back(std::move(response.shardId), std::move(result));
+ continue;
+ }
+ }
+
+ // Either we failed to get a response, or the command had a non-OK status.
+
+ // Convert the error status back into the format of a command result.
+ BSONObjBuilder resultBob;
+ Command::appendCommandStatus(resultBob, status);
+ auto result = resultBob.obj();
+
+ // Update the data structures that store the results.
+ errors.append(shard->getConnString().toString(), status.reason());
+ if (commonErrCode == -1) {
+ commonErrCode = status.code();
+ } else if (commonErrCode != status.code()) {
+ commonErrCode = 0;
+ }
+ subobj.append(shard->getConnString().toString(), result);
+ results.emplace_back(std::move(response.shardId), std::move(result));
+ }
+
+ subobj.done();
+
+ if (hasWCError) {
+ appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output);
+ }
+
+ BSONObj errobj = errors.done();
+ if (!errobj.isEmpty()) {
+ // If code for all errors is the same, then report the common error code.
+ if (commonErrCode > 0) {
+ return {ErrorCodes::fromInt(commonErrCode), errobj.toString()};
+ }
+ return {ErrorCodes::OperationFailed, errobj.toString()};
+ }
+
+ return results;
+}
+
int getUniqueCodeFromCommandResults(const std::vector<Strategy::CommandResult>& results) {
int commonErrCode = -1;
for (std::vector<Strategy::CommandResult>::const_iterator it = results.begin();