summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/mr_common.cpp
diff options
context:
space:
mode:
authorNicholas Zolnierz <nicholas.zolnierz@mongodb.com>2019-10-08 22:18:54 +0000
committerevergreen <evergreen@mongodb.com>2019-10-08 22:18:54 +0000
commit13c2e614e05cb58753ee3a89a0fa9b14d0837a6d (patch)
tree9f9ba57524e99c069a4a5e0db82f6490858e7cac /src/mongo/db/commands/mr_common.cpp
parent40fb24a73ce0c8c1092dda3f2631648990f4587a (diff)
downloadmongo-13c2e614e05cb58753ee3a89a0fa9b14d0837a6d.tar.gz
SERVER-42942 M/R Agg: Implement translation for cluster mapReduce command
Diffstat (limited to 'src/mongo/db/commands/mr_common.cpp')
-rw-r--r--src/mongo/db/commands/mr_common.cpp197
1 files changed, 181 insertions, 16 deletions
diff --git a/src/mongo/db/commands/mr_common.cpp b/src/mongo/db/commands/mr_common.cpp
index a13b323ea23..ea0dea5e218 100644
--- a/src/mongo/db/commands/mr_common.cpp
+++ b/src/mongo/db/commands/mr_common.cpp
@@ -40,15 +40,152 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_group.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_merge.h"
+#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/expression_javascript.h"
+#include "mongo/db/pipeline/parsed_aggregation_projection_node.h"
+#include "mongo/db/pipeline/parsed_inclusion_projection.h"
+#include "mongo/db/query/util/make_data_structure.h"
+#include "mongo/util/intrusive_counter.h"
#include "mongo/util/log.h"
#include "mongo/util/str.h"
-namespace mongo {
-
-namespace mr {
+namespace mongo::map_reduce_common {
namespace {
Rarely nonAtomicDeprecationSampler; // Used to occasionally log deprecation messages.
+
+using namespace std::string_literals;
+
+auto translateSort(boost::intrusive_ptr<ExpressionContext> expCtx,
+ const BSONObj& sort,
+ const boost::optional<std::int64_t>& limit) {
+ return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1));
+}
+
+auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
+ auto emitExpression = ExpressionInternalJsEmit::create(
+ expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code);
+ auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
+ node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression));
+ auto inclusion = std::unique_ptr<TransformerInterface>{
+ std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
+ expCtx,
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
+ std::move(node))};
+ return make_intrusive<DocumentSourceSingleDocumentTransformation>(
+ expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
+}
+
+auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
+ auto accumulatorArguments = ExpressionObject::create(
+ expCtx,
+ make_vector<std::pair<std::string, boost::intrusive_ptr<Expression>>>(
+ std::pair{"data"s,
+ ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)},
+ std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})}));
+ auto jsReduce = AccumulationStatement{
+ "value",
+ std::move(accumulatorArguments),
+ AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)};
+ auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
+ return DocumentSourceGroup::create(expCtx,
+ std::move(groupExpr),
+ make_vector<AccumulationStatement>(std::move(jsReduce)),
+ boost::none);
+}
+
+auto translateFinalize(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
+ auto jsExpression = ExpressionInternalJs::create(
+ expCtx,
+ ExpressionArray::create(
+ expCtx,
+ make_vector<boost::intrusive_ptr<Expression>>(
+ ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState),
+ ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))),
+ code);
+ auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
+ node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression));
+ auto inclusion = std::unique_ptr<TransformerInterface>{
+ std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
+ expCtx,
+ ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
+ std::move(node))};
+ return make_intrusive<DocumentSourceSingleDocumentTransformation>(
+ expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
+}
+
+auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx,
+ const StringData inputDatabase,
+ NamespaceString targetNss) {
+ uassert(31278,
+ "MapReduce must output to the database belonging to its input collection - Input: "s +
+ inputDatabase + " Output: " + targetNss.db(),
+ inputDatabase == targetNss.db());
+ return DocumentSourceOut::create(std::move(targetNss), expCtx);
+}
+
+auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) {
+ return DocumentSourceMerge::create(targetNss,
+ expCtx,
+ MergeWhenMatchedModeEnum::kReplace,
+ MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none, // Let variables
+ boost::none, // pipeline
+ std::set<FieldPath>{FieldPath("_id"s)},
+ boost::none); // targetCollectionVersion
+}
+
+auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
+ NamespaceString targetNss,
+ std::string code) {
+ // Because of communication for sharding, $merge must hold on to a serializable BSON object
+ // at the moment so we reparse here.
+ auto reduceObj = BSON("args" << BSON_ARRAY("$value"
+ << "$$new.value")
+ << "eval" << code);
+
+ auto finalProjectSpec =
+ BSON(DocumentSourceProject::kStageName
+ << BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj)));
+ auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec});
+ return DocumentSourceMerge::create(targetNss,
+ expCtx,
+ MergeWhenMatchedModeEnum::kPipeline,
+ MergeWhenNotMatchedModeEnum::kInsert,
+ boost::none, // Let variables
+ pipelineSpec,
+ std::set<FieldPath>{FieldPath("_id"s)},
+ boost::none); // targetCollectionVersion
+}
+
+auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
+ const OutputType outputType,
+ const StringData inputDatabase,
+ NamespaceString targetNss,
+ std::string reduceCode) {
+ switch (outputType) {
+ case OutputType::Replace:
+ return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss));
+ case OutputType::Merge:
+ return boost::make_optional(translateOutMerge(expCtx, targetNss));
+ case OutputType::Reduce:
+ return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode));
+ case OutputType::InMemory:;
+ }
+ return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
+}
+
} // namespace
OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdObj) {
@@ -57,24 +194,24 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb
outputOptions.outNonAtomic = false;
if (cmdObj["out"].type() == String) {
outputOptions.collectionName = cmdObj["out"].String();
- outputOptions.outType = OutputType::kReplace;
+ outputOptions.outType = OutputType::Replace;
} else if (cmdObj["out"].type() == Object) {
BSONObj o = cmdObj["out"].embeddedObject();
if (o.hasElement("normal")) {
- outputOptions.outType = OutputType::kReplace;
+ outputOptions.outType = OutputType::Replace;
outputOptions.collectionName = o["normal"].String();
} else if (o.hasElement("replace")) {
- outputOptions.outType = OutputType::kReplace;
+ outputOptions.outType = OutputType::Replace;
outputOptions.collectionName = o["replace"].String();
} else if (o.hasElement("merge")) {
- outputOptions.outType = OutputType::kMerge;
+ outputOptions.outType = OutputType::Merge;
outputOptions.collectionName = o["merge"].String();
} else if (o.hasElement("reduce")) {
- outputOptions.outType = OutputType::kReduce;
+ outputOptions.outType = OutputType::Reduce;
outputOptions.collectionName = o["reduce"].String();
} else if (o.hasElement("inline")) {
- outputOptions.outType = OutputType::kInMemory;
+ outputOptions.outType = OutputType::InMemory;
uassert(ErrorCodes::InvalidOptions,
"cannot specify 'sharded' in combination with 'inline'",
!o.hasElement("sharded"));
@@ -96,8 +233,8 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb
if (outputOptions.outNonAtomic) {
uassert(15895,
"nonAtomic option cannot be used with this output type",
- (outputOptions.outType == OutputType::kReduce ||
- outputOptions.outType == OutputType::kMerge));
+ (outputOptions.outType == OutputType::Reduce ||
+ outputOptions.outType == OutputType::Merge));
} else if (nonAtomicDeprecationSampler.tick()) {
warning() << "Setting out.nonAtomic to false in MapReduce is deprecated.";
}
@@ -106,7 +243,7 @@ OutputOptions parseOutputOptions(const std::string& dbname, const BSONObj& cmdOb
uasserted(13606, "'out' has to be a string or an object");
}
- if (outputOptions.outType != OutputType::kInMemory) {
+ if (outputOptions.outType != OutputType::InMemory) {
const StringData outDb(outputOptions.outDB.empty() ? dbname : outputOptions.outDB);
const NamespaceString nss(outDb, outputOptions.collectionName);
uassert(ErrorCodes::InvalidNamespace,
@@ -130,10 +267,10 @@ void addPrivilegesRequiredForMapReduce(const BasicCommand* commandTemplate,
inputResource.isExactNamespacePattern());
out->push_back(Privilege(inputResource, ActionType::find));
- if (outputOptions.outType != OutputType::kInMemory) {
+ if (outputOptions.outType != OutputType::InMemory) {
ActionSet outputActions;
outputActions.addAction(ActionType::insert);
- if (outputOptions.outType == OutputType::kReplace) {
+ if (outputOptions.outType == OutputType::Replace) {
outputActions.addAction(ActionType::remove);
} else {
outputActions.addAction(ActionType::update);
@@ -163,5 +300,33 @@ bool mrSupportsWriteConcern(const BSONObj& cmd) {
return true;
}
}
-} // namespace mr
-} // namespace mongo
+
+std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
+ MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) {
+
+ // TODO: It would be good to figure out what kind of errors this would produce in the Status.
+ // It would be better not to produce something incomprehensible out of an internal translation.
+ return uassertStatusOK(Pipeline::create(
+ makeFlattenedList<boost::intrusive_ptr<DocumentSource>>(
+ parsedMr.getQuery().map(
+ [&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }),
+ parsedMr.getSort().map(
+ [&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }),
+ translateMap(expCtx, parsedMr.getMap().getCode()),
+ DocumentSourceUnwind::create(expCtx, "emits", false, boost::none),
+ translateReduce(expCtx, parsedMr.getReduce().getCode()),
+ parsedMr.getFinalize().map([&](auto&& finalize) {
+ return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
+ }),
+ translateOut(expCtx,
+ parsedMr.getOutOptions().getOutputType(),
+ parsedMr.getNamespace().db(),
+ NamespaceString{parsedMr.getOutOptions().getDatabaseName()
+ ? *parsedMr.getOutOptions().getDatabaseName()
+ : parsedMr.getNamespace().db(),
+ parsedMr.getOutOptions().getCollectionName()},
+ parsedMr.getReduce().getCode())),
+ expCtx));
+}
+
+} // namespace mongo::map_reduce_common