diff options
Diffstat (limited to 'src/mongo/db/commands/map_reduce_agg.cpp')
-rw-r--r-- | src/mongo/db/commands/map_reduce_agg.cpp | 169 |
1 files changed, 2 insertions, 167 deletions
diff --git a/src/mongo/db/commands/map_reduce_agg.cpp b/src/mongo/db/commands/map_reduce_agg.cpp index aaffa84dc19..f1c94a9c339 100644 --- a/src/mongo/db/commands/map_reduce_agg.cpp +++ b/src/mongo/db/commands/map_reduce_agg.cpp @@ -41,155 +41,18 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/map_reduce_agg.h" #include "mongo/db/commands/map_reduce_javascript_code.h" +#include "mongo/db/commands/mr_common.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/pipeline/document_source.h" -#include "mongo/db/pipeline/document_source_group.h" -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_match.h" -#include "mongo/db/pipeline/document_source_merge.h" -#include "mongo/db/pipeline/document_source_out.h" -#include "mongo/db/pipeline/document_source_project.h" -#include "mongo/db/pipeline/document_source_single_document_transformation.h" -#include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" -#include "mongo/db/pipeline/expression_javascript.h" -#include "mongo/db/pipeline/parsed_aggregation_projection_node.h" -#include "mongo/db/pipeline/parsed_inclusion_projection.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/map_reduce_output_format.h" -#include "mongo/db/query/util/make_data_structure.h" -#include "mongo/util/intrusive_counter.h" namespace mongo::map_reduce_agg { namespace { -using namespace std::string_literals; - -auto translateSort(boost::intrusive_ptr<ExpressionContext> expCtx, - const BSONObj& sort, - const boost::optional<std::int64_t>& limit) { - return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1)); -} - -auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) { - auto emitExpression = ExpressionInternalJsEmit::create( - expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code); - auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>( - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}); - node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression)); - auto inclusion = std::unique_ptr<TransformerInterface>{ - std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>( - expCtx, - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}, - std::move(node))}; - return make_intrusive<DocumentSourceSingleDocumentTransformation>( - expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false); -} - -auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) { - auto accumulatorArguments = ExpressionObject::create( - expCtx, - make_vector<std::pair<std::string, boost::intrusive_ptr<Expression>>>( - std::pair{"data"s, - ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)}, - std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})})); - auto jsReduce = AccumulationStatement{ - "value", - std::move(accumulatorArguments), - AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)}; - auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState); - return DocumentSourceGroup::create(expCtx, - std::move(groupExpr), - make_vector<AccumulationStatement>(std::move(jsReduce)), - boost::none); -} - -auto translateFinalize(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) { - auto jsExpression = ExpressionInternalJs::create( - expCtx, - ExpressionArray::create( - expCtx, - make_vector<boost::intrusive_ptr<Expression>>( - ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState), - ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))), - code); - auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>( - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}); - node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression)); - auto inclusion = std::unique_ptr<TransformerInterface>{ - std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>( - expCtx, - ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId}, - std::move(node))}; - return make_intrusive<DocumentSourceSingleDocumentTransformation>( - expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false); -} - -auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx, - const StringData inputDatabase, - NamespaceString targetNss) { - uassert(31278, - "MapReduce must output to the database belonging to its input collection - Input: "s + - inputDatabase + "Output: " + targetNss.db(), - inputDatabase == targetNss.db()); - return DocumentSourceOut::create(std::move(targetNss), expCtx); -} - -auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) { - return DocumentSourceMerge::create(targetNss, - expCtx, - MergeWhenMatchedModeEnum::kReplace, - MergeWhenNotMatchedModeEnum::kInsert, - boost::none, // Let variables - boost::none, // pipeline - std::set<FieldPath>{FieldPath("_id"s)}, - boost::none); // targetCollectionVersion -} - -auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx, - NamespaceString targetNss, - std::string code) { - // Because of communication for sharding, $merge must hold on to a serializable BSON object - // at the moment so we reparse here. - auto reduceObj = BSON("args" << BSON_ARRAY("$value" - << "$$new.value") - << "eval" << code); - - auto finalProjectSpec = - BSON(DocumentSourceProject::kStageName - << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj))); - auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec}); - return DocumentSourceMerge::create(targetNss, - expCtx, - MergeWhenMatchedModeEnum::kPipeline, - MergeWhenNotMatchedModeEnum::kInsert, - boost::none, // Let variables - pipelineSpec, - std::set<FieldPath>{FieldPath("_id"s)}, - boost::none); // targetCollectionVersion -} - -auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx, - const OutputType outputType, - const StringData inputDatabase, - NamespaceString targetNss, - std::string reduceCode) { - switch (outputType) { - case OutputType::Replace: - return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss)); - case OutputType::Merge: - return boost::make_optional(translateOutMerge(expCtx, targetNss)); - case OutputType::Reduce: - return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode)); - case OutputType::InMemory:; - } - return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{}; -} - auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) { // AutoGetCollectionForReadCommand will throw if the sharding version for this connection is // out of date. @@ -247,7 +110,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd); auto expCtx = makeExpressionContext(opCtx, parsedMr); auto runnablePipeline = [&]() { - auto pipeline = translateFromMR(parsedMr, expCtx); + auto pipeline = map_reduce_common::translateFromMR(parsedMr, expCtx); return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( expCtx, pipeline.release()); }(); @@ -274,32 +137,4 @@ bool runAggregationMapReduce(OperationContext* opCtx, return true; } -std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR( - MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) { - - // TODO: It would be good to figure out what kind of errors this would produce in the Status. - // It would be better not to produce something incomprehensible out of an internal translation. - return uassertStatusOK(Pipeline::create( - makeFlattenedList<boost::intrusive_ptr<DocumentSource>>( - parsedMr.getQuery().map( - [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }), - parsedMr.getSort().map( - [&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }), - translateMap(expCtx, parsedMr.getMap().getCode()), - DocumentSourceUnwind::create(expCtx, "emits", false, boost::none), - translateReduce(expCtx, parsedMr.getReduce().getCode()), - parsedMr.getFinalize().map([&](auto&& finalize) { - return translateFinalize(expCtx, parsedMr.getFinalize()->getCode()); - }), - translateOut(expCtx, - parsedMr.getOutOptions().getOutputType(), - parsedMr.getNamespace().db(), - NamespaceString{parsedMr.getOutOptions().getDatabaseName() - ? *parsedMr.getOutOptions().getDatabaseName() - : parsedMr.getNamespace().db(), - parsedMr.getOutOptions().getCollectionName()}, - parsedMr.getReduce().getCode())), - expCtx)); -} - } // namespace mongo::map_reduce_agg |