diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_commands_common.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_commands_common.cpp | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/src/mongo/s/commands/cluster_commands_common.cpp b/src/mongo/s/commands/cluster_commands_common.cpp index 1e0b13f8fcd..c154029131a 100644 --- a/src/mongo/s/commands/cluster_commands_common.cpp +++ b/src/mongo/s/commands/cluster_commands_common.cpp @@ -34,17 +34,182 @@ #include "mongo/db/commands.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/parallel.h" #include "mongo/s/client/shard_connection.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/client/version_manager.h" +#include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" namespace mongo { +namespace { +BSONObj appendShardVersion(const BSONObj& cmdObj, ChunkVersion version) { + BSONObjBuilder cmdWithVersionBob; + cmdWithVersionBob.appendElements(cmdObj); + version.appendForCommands(&cmdWithVersionBob); + return cmdWithVersionBob.obj(); +} +} + +std::vector<AsyncRequestsSender::Request> buildRequestsForAllShards(OperationContext* opCtx, + const BSONObj& cmdObj) { + std::vector<AsyncRequestsSender::Request> requests; + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + return requests; +} + +std::vector<AsyncRequestsSender::Request> buildRequestsForTargetedShards( + OperationContext* opCtx, + const CachedCollectionRoutingInfo& routingInfo, + const BSONObj& cmdObj) { + std::vector<AsyncRequestsSender::Request> requests; + if (routingInfo.cm()) { + // The collection is sharded. Target all shards that own data for the collection. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + for (const ShardId& shardId : shardIds) { + requests.emplace_back( + shardId, appendShardVersion(cmdObj, routingInfo.cm()->getVersion(shardId))); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + if (routingInfo.primary()->isConfig()) { + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo.primaryId(), cmdObj); + } else { + requests.emplace_back(routingInfo.primaryId(), + appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())); + } + } + return requests; +} + +using ShardAndReply = std::tuple<ShardId, BSONObj>; + +StatusWith<std::vector<ShardAndReply>> gatherResults( + OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + int options, + const std::vector<AsyncRequestsSender::Request>& requests, + BSONObjBuilder* output) { + // Extract the readPreference from the command. + rpc::ServerSelectionMetadata ssm; + BSONObjBuilder unusedCmdBob; + BSONObjBuilder upconvertedMetadataBob; + uassertStatusOK(rpc::ServerSelectionMetadata::upconvert( + cmdObj, options, &unusedCmdBob, &upconvertedMetadataBob)); + auto upconvertedMetadata = upconvertedMetadataBob.obj(); + auto ssmElem = upconvertedMetadata.getField(rpc::ServerSelectionMetadata::fieldName()); + if (!ssmElem.eoo()) { + ssm = uassertStatusOK(rpc::ServerSelectionMetadata::readFromMetadata(ssmElem)); + } + auto readPref = ssm.getReadPreference(); + + // Send the requests. + + AsyncRequestsSender ars( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + dbName, + requests, + readPref ? *readPref : ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet())); + + // Get the responses. + + std::vector<ShardAndReply> results; // Stores results by ShardId + BSONObjBuilder subobj(output->subobjStart("raw")); // Stores results by ConnectionString + BSONObjBuilder errors; // Stores errors by ConnectionString + int commonErrCode = -1; // Stores the overall error code + + BSONElement wcErrorElem; + ShardId wcErrorShardId; + bool hasWCError = false; + + while (!ars.done()) { + auto response = ars.next(); + const auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, response.shardId); + if (!swShard.isOK()) { + output->resetToEmpty(); + return swShard.getStatus(); + } + const auto shard = std::move(swShard.getValue()); + + auto status = response.swResponse.getStatus(); + if (status.isOK()) { + // We successfully received a response. + + status = getStatusFromCommandResult(response.swResponse.getValue().data); + if (ErrorCodes::isStaleShardingError(status.code())) { + // Do not report any raw results if we fail to establish a shardVersion. + output->resetToEmpty(); + return status; + } + + auto result = std::move(response.swResponse.getValue().data); + if (!hasWCError) { + if ((wcErrorElem = result["writeConcernError"])) { + wcErrorShardId = response.shardId; + hasWCError = true; + } + } + + if (status.isOK()) { + // The command status was OK. + subobj.append(shard->getConnString().toString(), result); + results.emplace_back(std::move(response.shardId), std::move(result)); + continue; + } + } + + // Either we failed to get a response, or the command had a non-OK status. + + // Convert the error status back into the format of a command result. + BSONObjBuilder resultBob; + Command::appendCommandStatus(resultBob, status); + auto result = resultBob.obj(); + + // Update the data structures that store the results. + errors.append(shard->getConnString().toString(), status.reason()); + if (commonErrCode == -1) { + commonErrCode = status.code(); + } else if (commonErrCode != status.code()) { + commonErrCode = 0; + } + subobj.append(shard->getConnString().toString(), result); + results.emplace_back(std::move(response.shardId), std::move(result)); + } + + subobj.done(); + + if (hasWCError) { + appendWriteConcernErrorToCmdResponse(wcErrorShardId, wcErrorElem, *output); + } + + BSONObj errobj = errors.done(); + if (!errobj.isEmpty()) { + // If code for all errors is the same, then report the common error code. + if (commonErrCode > 0) { + return {ErrorCodes::fromInt(commonErrCode), errobj.toString()}; + } + return {ErrorCodes::OperationFailed, errobj.toString()}; + } + + return results; +} + int getUniqueCodeFromCommandResults(const std::vector<Strategy::CommandResult>& results) { int commonErrCode = -1; for (std::vector<Strategy::CommandResult>::const_iterator it = results.begin(); |