diff options
Diffstat (limited to 'src/mongo/s/commands/cluster_map_reduce_agg.cpp')
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.cpp | 197 |
1 files changed, 163 insertions, 34 deletions
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 9375d1c2176..2c2012c3af5 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -31,56 +31,185 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connpool.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/map_reduce_agg.h" #include "mongo/db/commands/map_reduce_gen.h" +#include "mongo/db/commands/mr_common.h" +#include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/map_reduce_output_format.h" +#include "mongo/s/catalog_cache.h" #include "mongo/s/commands/cluster_map_reduce_agg.h" namespace mongo { +namespace { -// Exhaust the cursor from the aggregation response and extract results and statistics. -std::vector<BSONObj> getAllAggregationResults(OperationContext* opCtx, - const std::string& dbname, - CursorResponse& response) { - CursorId cursorId = response.getCursorId(); - auto fullBatch = response.releaseBatch(); - while (cursorId != 0) { - GetMoreRequest request( - response.getNSS(), cursorId, boost::none, boost::none, boost::none, boost::none); - BSONObj getMoreResponse = CommandHelpers::runCommandDirectly( - opCtx, OpMsgRequest::fromDBAndBody(dbname, request.toBSON())); - auto getMoreCursorResponse = CursorResponse::parseFromBSONThrowing(getMoreResponse); - auto nextBatch = getMoreCursorResponse.releaseBatch(); - fullBatch.insert(fullBatch.end(), nextBatch.begin(), nextBatch.end()); - cursorId = getMoreCursorResponse.getCursorId(); +auto makeExpressionContext(OperationContext* opCtx, + const MapReduce& parsedMr, + boost::optional<CachedCollectionRoutingInfo> routingInfo) { + // Populate the collection UUID and the appropriate collation to use. + auto nss = parsedMr.getNamespace(); + auto [collationObj, uuid] = sharded_agg_helpers::getCollationAndUUID( + routingInfo, nss, parsedMr.getCollation().get_value_or(BSONObj())); + + std::unique_ptr<CollatorInterface> resolvedCollator; + if (!collationObj.isEmpty()) { + // This will be null if attempting to build an interface for the simple collator. + resolvedCollator = uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationObj)); + } + + // Resolve involved namespaces. + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); + if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) { + auto outNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName() + ? *parsedMr.getOutOptions().getDatabaseName() + : parsedMr.getNamespace().db(), + parsedMr.getOutOptions().getCollectionName()}; + resolvedNamespaces.try_emplace(outNss.coll(), outNss, std::vector<BSONObj>{}); } - return fullBatch; + + auto expCtx = make_intrusive<ExpressionContext>( + opCtx, + boost::none, // explain + false, // fromMongos + false, // needsmerge + true, // allowDiskUse + parsedMr.getBypassDocumentValidation().get_value_or(false), + nss, + collationObj, + boost::none, // runtimeConstants + std::move(resolvedCollator), + std::make_shared<MongoSInterface>(), + std::move(resolvedNamespaces), + boost::none); // uuid + expCtx->inMongos = true; + return expCtx; } +Document serializeToCommand(BSONObj originalCmd, const MapReduce& parsedMr, Pipeline* pipeline) { + MutableDocument translatedCmd; + + translatedCmd["aggregate"] = Value(parsedMr.getNamespace().coll()); + translatedCmd["pipeline"] = Value(pipeline->serialize()); + translatedCmd["cursor"] = Value(Document{{"batchSize", std::numeric_limits<long long>::max()}}); + translatedCmd["allowDiskUse"] = Value(true); + translatedCmd["fromMongos"] = Value(true); + + // Append generic command options. + for (const auto& elem : CommandHelpers::appendPassthroughFields(originalCmd, BSONObj())) { + translatedCmd[elem.fieldNameStringData()] = Value(elem); + } + return translatedCmd.freeze(); +} + +} // namespace + bool runAggregationMapReduce(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmd, std::string& errmsg, BSONObjBuilder& result) { - // Pretend we have built the appropriate pipeline and aggregation request. - auto mrRequest = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); - const BSONObj aggRequest = - fromjson(str::stream() << "{aggregate: '" << mrRequest.getNamespace().coll() - << "', pipeline: [ { $group: { _id: { user: \"$user\" }," - << "count: { $sum: 1 } } } ], cursor: {}}"); - BSONObj aggResult = CommandHelpers::runCommandDirectly( - opCtx, OpMsgRequest::fromDBAndBody(dbname, std::move(aggRequest))); - - bool inMemory = mrRequest.getOutOptions().getOutputType() == OutputType::InMemory; - std::string outColl = mrRequest.getOutOptions().getCollectionName(); - // Either inline response specified or we have an output collection. - invariant(inMemory ^ !outColl.empty()); - - auto cursorResponse = CursorResponse::parseFromBSONThrowing(aggResult); - auto completeBatch = getAllAggregationResults(opCtx, dbname, cursorResponse); - [[maybe_unused]] CursorResponse completeCursor( - cursorResponse.getNSS(), cursorResponse.getCursorId(), std::move(completeBatch)); + auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); + stdx::unordered_set<NamespaceString> involvedNamespaces{parsedMr.getNamespace()}; + auto resolvedOutNss = NamespaceString{parsedMr.getOutOptions().getDatabaseName() + ? *parsedMr.getOutOptions().getDatabaseName() + : parsedMr.getNamespace().db(), + parsedMr.getOutOptions().getCollectionName()}; + + if (parsedMr.getOutOptions().getOutputType() != OutputType::InMemory) { + involvedNamespaces.insert(resolvedOutNss); + } + + const auto pipelineBuilder = [&](boost::optional<CachedCollectionRoutingInfo> routingInfo) { + return map_reduce_common::translateFromMR( + parsedMr, makeExpressionContext(opCtx, parsedMr, routingInfo)); + }; + + auto namespaces = + ClusterAggregate::Namespaces{parsedMr.getNamespace(), parsedMr.getNamespace()}; + + // Auth has already been checked for the original mapReduce command, no need to recheck here. + PrivilegeVector privileges; + + // This holds the raw results from the aggregation, which will be reformatted to match the + // expected mapReduce output. + BSONObjBuilder tempResults; + + auto targeter = uassertStatusOK( + sharded_agg_helpers::AggregationTargeter::make(opCtx, + parsedMr.getNamespace(), + pipelineBuilder, + involvedNamespaces, + false, // hasChangeStream + true)); // allowedToPassthrough + switch (targeter.policy) { + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kPassthrough: { + // For the passthrough case, the targeter will not build a pipeline since its not needed + // in the normal aggregation path. For this translation, though, we need to build the + // pipeline to serialize and send to the primary shard. + auto serialized = + serializeToCommand(cmd, parsedMr, pipelineBuilder(targeter.routingInfo).get()); + uassertStatusOK( + sharded_agg_helpers::runPipelineOnPrimaryShard(opCtx, + namespaces, + targeter.routingInfo->db(), + boost::none, // explain + std::move(serialized), + privileges, + &tempResults)); + break; + } + + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kMongosRequired: { + // Pipelines generated from mapReduce should never be required to run on mongos. + uasserted(31291, "Internal error during mapReduce translation"); + break; + } + + case sharded_agg_helpers::AggregationTargeter::TargetingPolicy::kAnyShard: { + auto serialized = serializeToCommand(cmd, parsedMr, targeter.pipeline.get()); + uassertStatusOK( + sharded_agg_helpers::dispatchPipelineAndMerge(opCtx, + std::move(targeter), + std::move(serialized), + std::numeric_limits<long long>::max(), + namespaces, + privileges, + &tempResults, + false)); // hasChangeStream + break; + } + } + + auto aggResults = tempResults.done(); + if (parsedMr.getOutOptions().getOutputType() == OutputType::InMemory) { + auto exhaustedResults = [&]() { + BSONArrayBuilder bab; + for (auto&& elem : aggResults["cursor"]["firstBatch"].Obj()) + bab.append(elem.embeddedObject()); + return bab.arr(); + }(); + map_reduce_output_format::appendInlineResponse(std::move(exhaustedResults), + parsedMr.getVerbose().get_value_or(false), + true, // inMongos + &result); + } else { + map_reduce_output_format::appendOutResponse( + parsedMr.getOutOptions().getDatabaseName(), + parsedMr.getOutOptions().getCollectionName(), + boost::get_optional_value_or(parsedMr.getVerbose(), false), + true, // inMongos + &result); + } return true; } |