diff options
author | David Percy <david.percy@mongodb.com> | 2020-12-01 21:56:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-26 17:44:59 +0000 |
commit | dc1c3a791f63dfb909ed520aadab66a4b1ce66e9 (patch) | |
tree | f981eb0e1ee4958e0663cdc1c0ee7169ec67468d | |
parent | 31b07a007b80975852f75b43ba3437782825bf23 (diff) | |
download | mongo-dc1c3a791f63dfb909ed520aadab66a4b1ce66e9.tar.gz |
SERVER-53397 Desugar $setWindowFields partitionBy using $sort
19 files changed, 511 insertions, 78 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation.yml b/buildscripts/resmokeconfig/suites/aggregation.yml index 38ae5965b47..2795384eee5 100644 --- a/buildscripts/resmokeconfig/suites/aggregation.yml +++ b/buildscripts/resmokeconfig/suites/aggregation.yml @@ -24,3 +24,4 @@ executor: mongod_options: set_parameters: enableTestCommands: 1 + featureFlagWindowFunctions: 1 diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 6dee1e5a1be..3ed1633da08 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -9790,6 +9790,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" - &enterprise-windows-template @@ -9886,6 +9890,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" - <<: *enterprise-windows-nopush-template @@ -10548,6 +10556,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" tasks: - name: compile_parallel_TG @@ -11851,6 +11863,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" tasks: - name: compile_all_run_unittests_TG @@ -12853,6 +12869,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" tasks: - name: compile_all_run_unittests_TG @@ -13015,6 +13035,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" multiversion_platform: ubuntu1804 multiversion_edition: enterprise @@ -13115,6 +13139,10 @@ buildvariants: featureFlagTimeseriesCollection: true, featureFlagShardingFullDDLSupport: true, featureFlagShardingFullDDLSupportTimestampedVersion: true, + featureFlagWindowFunctions: true, + }" + --mongosSetParameters="{ + featureFlagWindowFunctions: true, }" resmoke_jobs_factor: 0.3 # Avoid starting too many mongod's under {A,UB}SAN build. hang_analyzer_dump_core: false diff --git a/jstests/aggregation/sources/setWindowFields/desugar.js b/jstests/aggregation/sources/setWindowFields/desugar.js new file mode 100644 index 00000000000..889ac9ebffd --- /dev/null +++ b/jstests/aggregation/sources/setWindowFields/desugar.js @@ -0,0 +1,99 @@ +/** + * Test how $setWindowFields desugars. + * + * We handle partitionBy and sortBy by generating a separate $sort stage. + * + * @tags: [ + * # We assume the pipeline is not split into a shardsPart and mergerPart. + * assumes_unsharded_collection, + * # We're testing the explain plan, not the query results, so the facet passthrough would fail. + * do_not_wrap_aggregations_in_facets, + * ] + */ +(function() { +"use strict"; + +const featureEnabled = + assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1})) + .featureFlagWindowFunctions.value; +if (!featureEnabled) { + jsTestLog("Skipping test because the window function feature flag is disabled"); + return; +} + +const coll = db[jsTestName()]; +coll.insert({}); + +// Use .explain() to see what the stage desugars to. +// The result is formatted as explain-output, which differs from MQL syntax in some cases: +// for example {$sort: {a: 1}} explains as {$sort: {sortKey: {a: 1}}}. +function desugar(stage) { + const result = coll.explain().aggregate([ + // prevent stages from being absorbed into the .find() layer + {$_internalInhibitOptimization: {}}, + stage, + ]); + assert.commandWorked(result); + assert(Array.isArray(result.stages), result); + // The first two stages should be the .find() cursor and the inhibit-optimization stage; + // the rest of the stages are what the user's 'stage' expanded to. + assert(result.stages[0].$cursor, result); + assert(result.stages[1].$_internalInhibitOptimization, result); + return result.stages.slice(2); +} + +// Often, the desugared stages include a generated temporary name. +// When this happens, it's always in the first stage, an $addFields. +function extractTmp(stages) { + assert(stages[0].$addFields, stages); + const tmp = Object.keys(stages[0].$addFields)[0]; + assert(tmp, stages); + return tmp; +} + +// No partitionBy and no sortBy means we don't need to sort the input. +assert.eq(desugar({$setWindowFields: {output: {}}}), [ + {$_internalSetWindowFields: {output: {}}}, +]); + +// 'sortBy' becomes an explicit $sort stage. +assert.eq(desugar({$setWindowFields: {sortBy: {ts: 1}, output: {}}}), [ + {$sort: {sortKey: {ts: 1}}}, + {$_internalSetWindowFields: {sortBy: {ts: 1}, output: {}}}, +]); + +// 'partitionBy' a field becomes an explicit $sort stage. +assert.eq(desugar({$setWindowFields: {partitionBy: "$zip", output: {}}}), [ + {$sort: {sortKey: {zip: 1}}}, + {$_internalSetWindowFields: {partitionBy: "$zip", output: {}}}, +]); + +// 'partitionBy' an expression becomes $set + $sort + $unset. +// Also, the _internal stage reads from the already-computed field. +let stages = desugar({$setWindowFields: {partitionBy: {$toLower: "$country"}, output: {}}}); +let tmp = extractTmp(stages); +assert.eq(stages, [ + {$addFields: {[tmp]: {$toLower: ["$country"]}}}, + {$sort: {sortKey: {[tmp]: 1}}}, + {$_internalSetWindowFields: {partitionBy: '$' + tmp, output: {}}}, + {$project: {[tmp]: false}}, +]); + +// $sort first by partitionBy, then sortBy, because we sort within each partition. +assert.eq( + desugar({$setWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}}), [ + {$sort: {sortKey: {zip: 1, ts: -1, _id: 1}}}, + {$_internalSetWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}}, + ]); + +stages = desugar({ + $setWindowFields: {partitionBy: {$toLower: "$country"}, sortBy: {ts: -1, _id: 1}, output: {}} +}); +tmp = extractTmp(stages); +assert.eq(stages, [ + {$addFields: {[tmp]: {$toLower: ["$country"]}}}, + {$sort: {sortKey: {[tmp]: 1, ts: -1, _id: 1}}}, + {$_internalSetWindowFields: {partitionBy: '$' + tmp, sortBy: {ts: -1, _id: 1}, output: {}}}, + {$project: {[tmp]: false}}, +]); +})(); diff --git a/jstests/aggregation/sources/setWindowFields/parse.js b/jstests/aggregation/sources/setWindowFields/parse.js new file mode 100644 index 00000000000..e7470e67de4 --- /dev/null +++ b/jstests/aggregation/sources/setWindowFields/parse.js @@ -0,0 +1,50 @@ +/** + * Test the syntax of $setWindowFields. For example: + * - Which options are allowed? + * - When is an expression expected, vs a constant? + */ +(function() { +"use strict"; + +const featureEnabled = + assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1})) + .featureFlagWindowFunctions.value; +if (!featureEnabled) { + jsTestLog("Skipping test because the window function feature flag is disabled"); + return; +} + +const coll = db[jsTestName()]; +coll.drop(); + +assert.commandWorked(coll.insert({})); + +function run(stage, extraCommandArgs = {}) { + return coll.runCommand( + Object.merge({aggregate: coll.getName(), pipeline: [stage], cursor: {}}, extraCommandArgs)); +} + +// Test that the stage spec must be an object. +assert.commandFailedWithCode(run({$setWindowFields: "invalid"}), ErrorCodes.FailedToParse); + +// Test that the stage parameters are the correct type. +assert.commandFailedWithCode(run({$setWindowFields: {sortBy: "invalid"}}), ErrorCodes.TypeMismatch); +assert.commandFailedWithCode(run({$setWindowFields: {output: "invalid"}}), ErrorCodes.TypeMismatch); + +// Test that parsing fails for an invalid partitionBy expression. +assert.commandFailedWithCode( + run({$setWindowFields: {partitionBy: {$notAnOperator: 1}, output: {}}}), + ErrorCodes.InvalidPipelineOperator); + +// Since partitionBy can be any expression, it can be a variable. +assert.commandWorked(run({$setWindowFields: {partitionBy: "$$NOW", output: {}}})); +assert.commandWorked( + run({$setWindowFields: {partitionBy: "$$myobj.a", output: {}}}, {let : {myobj: {a: 456}}})); + +// Test that parsing fails for unrecognized parameters. +assert.commandFailedWithCode(run({$setWindowFields: {what_is_this: 1}}), 40415); + +// Test for a successful parse, ignoring the response documents. +assert.commandWorked( + run({$setWindowFields: {partitionBy: "$state", sortBy: {city: 1}, output: {a: {$sum: 1}}}})); +})(); diff --git a/src/mongo/db/exec/add_fields_projection_executor.cpp b/src/mongo/db/exec/add_fields_projection_executor.cpp index 330232728c5..592074b4834 100644 --- a/src/mongo/db/exec/add_fields_projection_executor.cpp +++ b/src/mongo/db/exec/add_fields_projection_executor.cpp @@ -218,6 +218,22 @@ std::unique_ptr<AddFieldsProjectionExecutor> AddFieldsProjectionExecutor::create return executor; } +std::unique_ptr<AddFieldsProjectionExecutor> AddFieldsProjectionExecutor::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const FieldPath& fieldPath, + const boost::intrusive_ptr<Expression>& expr) { + + // This helper is only meant for creating top-level fields. Dotted field paths require + // thinking about implicit array traversal. + tassert(5339700, + str::stream() << "Expected a top-level field name, but got " << fieldPath.fullPath(), + fieldPath.getPathLength() == 1); + + auto executor = std::make_unique<AddFieldsProjectionExecutor>(expCtx); + executor->_root->addExpressionForPath(fieldPath, expr); + return executor; +} + void AddFieldsProjectionExecutor::parse(const BSONObj& spec) { for (auto elem : spec) { // The field name might be a dotted path. diff --git a/src/mongo/db/exec/add_fields_projection_executor.h b/src/mongo/db/exec/add_fields_projection_executor.h index fd3ac8decab..309d27b55a2 100644 --- a/src/mongo/db/exec/add_fields_projection_executor.h +++ b/src/mongo/db/exec/add_fields_projection_executor.h @@ -64,6 +64,16 @@ public: static std::unique_ptr<AddFieldsProjectionExecutor> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONObj& spec); + /** + * Create a projection that binds an expression to a top-level field. + * + * 'fieldPath' must be a top-level field name (exactly one element; no dots). + */ + static std::unique_ptr<AddFieldsProjectionExecutor> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const FieldPath& fieldPath, + const boost::intrusive_ptr<Expression>& expr); + TransformerType getType() const final { return TransformerType::kComputedProjection; } diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index d33581db906..aa0ca4885ea 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -79,7 +79,19 @@ void DocumentSource::registerParser( it == parserMap.end()); parserMap[name] = {parser, requiredMinVersion}; } +void DocumentSource::registerParser( + string name, + SimpleParser simpleParser, + boost::optional<ServerGlobalParams::FeatureCompatibility::Version> requiredMinVersion) { + Parser parser = + [simpleParser = std::move(simpleParser)]( + BSONElement stageSpec, + const intrusive_ptr<ExpressionContext>& expCtx) -> list<intrusive_ptr<DocumentSource>> { + return {simpleParser(std::move(stageSpec), expCtx)}; + }; + return registerParser(std::move(name), std::move(parser), std::move(requiredMinVersion)); +} bool DocumentSource::hasQuery() const { return false; } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index b9c89f39bc7..e4797743036 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -70,8 +70,11 @@ class Document; * LiteParsedDocumentSource. This is used for checks that need to happen before a full parse, * such as checks about which namespaces are referenced by this aggregation. * - * 'fullParser' takes a BSONElement and an ExpressionContext and returns a fully-executable - * DocumentSource. This will be used for optimization and execution. + * 'fullParser' is either a DocumentSource::SimpleParser or a DocumentSource::Parser. + * In both cases, it takes a BSONElement and an ExpressionContext and returns fully-executable + * DocumentSource(s), for optimization and execution. In the common case it's a SimpleParser, + * which returns a single DocumentSource; in the general case it's a Parser, which returns a whole + * std::list to support "multi-stage aliases" like $bucket. * * Stages that do not require any special pre-parse checks can use * LiteParsedDocumentSourceDefault::parse as their 'liteParser'. @@ -82,57 +85,55 @@ class Document; * REGISTER_DOCUMENT_SOURCE(foo, * LiteParsedDocumentSourceDefault::parse, * DocumentSourceFoo::createFromBson); + */ +#define REGISTER_DOCUMENT_SOURCE(key, liteParser, fullParser) \ + REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, boost::none, true) + +/** + * Like REGISTER_DOCUMENT_SOURCE, except the parser will only be enabled when FCV >= minVersion. + * We store minVersion in the parserMap, so that changing FCV at runtime correctly enables/disables + * the parser. + */ +#define REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION(key, liteParser, fullParser, minVersion) \ + REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, minVersion, true) + +/** + * Like REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION, except you can also specify a condition, + * evaluated during startup, that decides whether to register the parser. + * + * For example, you could check a feature flag, and register the parser only when it's enabled. * - * If your stage is actually an alias which needs to return more than one stage (such as - * $sortByCount), you should use the REGISTER_MULTI_STAGE_ALIAS macro instead. + * Note that the condition is evaluated only once, during a MONGO_INITIALIZER. Don't specify + * a condition that can change at runtime, such as FCV. (Feature flags are ok, because they + * cannot be toggled at runtime.) + * + * This is the most general REGISTER_DOCUMENT_SOURCE* macro, which all others should delegate to. */ #define REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, minVersion, ...) \ MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) { \ if (!__VA_ARGS__) { \ return; \ } \ - auto fullParserWrapper = [](BSONElement stageSpec, \ - const boost::intrusive_ptr<ExpressionContext>& expCtx) { \ - return std::list<boost::intrusive_ptr<DocumentSource>>{ \ - (fullParser)(stageSpec, expCtx)}; \ - }; \ LiteParsedDocumentSource::registerParser("$" #key, liteParser); \ - DocumentSource::registerParser("$" #key, fullParserWrapper, minVersion); \ + DocumentSource::registerParser("$" #key, fullParser, minVersion); \ } -#define REGISTER_DOCUMENT_SOURCE(key, liteParser, fullParser) \ - REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, boost::none, true) - +/** + * Like REGISTER_DOCUMENT_SOURCE, except the parser is only enabled when test-commands are enabled. + */ #define REGISTER_TEST_DOCUMENT_SOURCE(key, liteParser, fullParser) \ REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( \ key, liteParser, fullParser, boost::none, ::mongo::getTestCommandsEnabled()) -#define REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION(key, liteParser, fullParser, minVersion) \ - REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, minVersion, true) - -/** - * Registers a multi-stage alias (such as $sortByCount) to have the single name 'key'. When a stage - * with name '$key' is found, 'liteParser' will be used to produce a LiteParsedDocumentSource, - * while 'fullParser' will be called to construct a vector of DocumentSources. See the comments on - * REGISTER_DOCUMENT_SOURCE for more information. - * - * As an example, if your stage alias looks like {$foo: <args>} and does *not* require any special - * pre-parse checks, you should implement a static parser like DocumentSourceFoo::createFromBson(), - * and register it like so: - * REGISTER_MULTI_STAGE_ALIAS(foo, - * LiteParsedDocumentSourceDefault::parse, - * DocumentSourceFoo::createFromBson); - */ -#define REGISTER_MULTI_STAGE_ALIAS(key, liteParser, fullParser) \ - MONGO_INITIALIZER(addAliasToDocSourceParserMap_##key)(InitializerContext*) { \ - LiteParsedDocumentSource::registerParser("$" #key, (liteParser)); \ - DocumentSource::registerParser("$" #key, (fullParser), boost::none); \ - } - class DocumentSource : public RefCountable { public: + // In general a parser returns a list of DocumentSources, to accomodate "multi-stage aliases" + // like $bucket. using Parser = std::function<std::list<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; + // But in the common case a parser returns only one DocumentSource. + using SimpleParser = std::function<boost::intrusive_ptr<DocumentSource>( + BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement; using HostTypeRequirement = StageConstraints::HostTypeRequirement; @@ -367,6 +368,17 @@ public: std::string name, Parser parser, boost::optional<ServerGlobalParams::FeatureCompatibility::Version> requiredMinVersion); + /** + * Convenience wrapper for the common case, when DocumentSource::Parser returns a list of one + * DocumentSource. + * + * DO NOT call this method directly. Instead, use the REGISTER_DOCUMENT_SOURCE macro defined in + * this file. + */ + static void registerParser( + std::string name, + SimpleParser simpleParser, + boost::optional<ServerGlobalParams::FeatureCompatibility::Version> requiredMinVersion); /** * Returns true if the DocumentSource has a query. diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp index 2e2b4a5a53d..53d012cd258 100644 --- a/src/mongo/db/pipeline/document_source_add_fields.cpp +++ b/src/mongo/db/pipeline/document_source_add_fields.cpp @@ -70,6 +70,19 @@ intrusive_ptr<DocumentSource> DocumentSourceAddFields::create( return addFields; } +intrusive_ptr<DocumentSource> DocumentSourceAddFields::create( + const FieldPath& fieldPath, + const intrusive_ptr<Expression>& expr, + const intrusive_ptr<ExpressionContext>& expCtx) { + + const bool isIndependentOfAnyCollection = false; + return make_intrusive<DocumentSourceSingleDocumentTransformation>( + expCtx, + projection_executor::AddFieldsProjectionExecutor::create(expCtx, fieldPath, expr), + kStageName, + isIndependentOfAnyCollection); +} + intrusive_ptr<DocumentSource> DocumentSourceAddFields::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { const auto specifiedName = elem.fieldNameStringData(); diff --git a/src/mongo/db/pipeline/document_source_add_fields.h b/src/mongo/db/pipeline/document_source_add_fields.h index 5c99a3790bb..49cb228c73b 100644 --- a/src/mongo/db/pipeline/document_source_add_fields.h +++ b/src/mongo/db/pipeline/document_source_add_fields.h @@ -53,6 +53,16 @@ public: StringData stageName = kStageName); /** + * Create a stage that binds an expression to a top-level field. + * + * 'fieldPath' must be a top-level field name (exactly one element; no dots). + */ + static boost::intrusive_ptr<DocumentSource> create( + const FieldPath& fieldPath, + const boost::intrusive_ptr<Expression>& expr, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** * Parses a $addFields stage from the user-supplied BSON. */ static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_bucket.cpp b/src/mongo/db/pipeline/document_source_bucket.cpp index 73386201792..d5761bf3c85 100644 --- a/src/mongo/db/pipeline/document_source_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_bucket.cpp @@ -40,9 +40,9 @@ using boost::intrusive_ptr; using std::list; using std::vector; -REGISTER_MULTI_STAGE_ALIAS(bucket, - LiteParsedDocumentSourceDefault::parse, - DocumentSourceBucket::createFromBson); +REGISTER_DOCUMENT_SOURCE(bucket, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceBucket::createFromBson); namespace { intrusive_ptr<ExpressionConstant> getExpressionConstant(ExpressionContext* const expCtx, diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index bad92703daa..7b80cb18d02 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -65,9 +65,9 @@ using std::vector; // and re-parse the pipeline. To make this work, the 'transformation' stage will serialize itself // with the original specification, and all other stages that are created during the alias expansion // will not serialize themselves. -REGISTER_MULTI_STAGE_ALIAS(changeStream, - DocumentSourceChangeStream::LiteParsed::parse, - DocumentSourceChangeStream::createFromBson); +REGISTER_DOCUMENT_SOURCE(changeStream, + DocumentSourceChangeStream::LiteParsed::parse, + DocumentSourceChangeStream::createFromBson); constexpr StringData DocumentSourceChangeStream::kDocumentKeyField; constexpr StringData DocumentSourceChangeStream::kFullDocumentBeforeChangeField; diff --git a/src/mongo/db/pipeline/document_source_count.cpp b/src/mongo/db/pipeline/document_source_count.cpp index 80a98a5426b..1107e2e6eeb 100644 --- a/src/mongo/db/pipeline/document_source_count.cpp +++ b/src/mongo/db/pipeline/document_source_count.cpp @@ -43,9 +43,9 @@ using boost::intrusive_ptr; using std::list; using std::string; -REGISTER_MULTI_STAGE_ALIAS(count, - LiteParsedDocumentSourceDefault::parse, - DocumentSourceCount::createFromBson); +REGISTER_DOCUMENT_SOURCE(count, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceCount::createFromBson); list<intrusive_ptr<DocumentSource>> DocumentSourceCount::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp index e5f048a8e3d..6ea0e53e055 100644 --- a/src/mongo/db/pipeline/document_source_project.cpp +++ b/src/mongo/db/pipeline/document_source_project.cpp @@ -33,6 +33,7 @@ #include <boost/optional.hpp> #include <boost/smart_ptr/intrusive_ptr.hpp> +#include <memory> #include "mongo/db/exec/projection_executor.h" #include "mongo/db/exec/projection_executor_builder.h" @@ -100,6 +101,26 @@ intrusive_ptr<DocumentSource> DocumentSourceProject::create( return project; } +boost::intrusive_ptr<DocumentSource> DocumentSourceProject::createUnset( + const FieldPath& fieldPath, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + + // This helper is only meant for removing top-level fields. Dotted field paths require + // thinking about implicit array traversal. + tassert(5339701, + str::stream() << "Expected a top-level field name, but got " << fieldPath.fullPath(), + fieldPath.getPathLength() == 1); + + projection_ast::ProjectionPathASTNode pathNode; + pathNode.addChild(fieldPath.fullPath(), + std::make_unique<projection_ast::BooleanConstantASTNode>(false)); + auto projection = projection_ast::Projection{ + std::move(pathNode), + projection_ast::ProjectType::kExclusion, + }; + + return create(std::move(projection), expCtx, kAliasNameUnset); +} + intrusive_ptr<DocumentSource> DocumentSourceProject::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { if (elem.fieldNameStringData() == kStageName) { diff --git a/src/mongo/db/pipeline/document_source_project.h b/src/mongo/db/pipeline/document_source_project.h index 13d45282a75..9401231a56b 100644 --- a/src/mongo/db/pipeline/document_source_project.h +++ b/src/mongo/db/pipeline/document_source_project.h @@ -60,16 +60,24 @@ public: BSONObj projectSpec, const boost::intrusive_ptr<ExpressionContext>& expCtx, StringData specifiedName) try { - return create(projection_ast::parse( - expCtx, projectSpec, ProjectionPolicies::aggregateProjectionPolicies()), - expCtx, - specifiedName); + + auto projection = projection_ast::parse( + expCtx, projectSpec, ProjectionPolicies::aggregateProjectionPolicies()); + return create(projection, expCtx, specifiedName); } catch (DBException& ex) { ex.addContext("Invalid " + specifiedName.toString()); throw; } /** + * Create an '$unset' stage, which removes a single top-level field. + * + * 'fieldPath' must be a top-level field. + */ + static boost::intrusive_ptr<DocumentSource> createUnset( + const FieldPath& fieldPath, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + /** * Parses a $project stage from the user-supplied BSON. */ static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp index a98069a6c81..f38d76b6c14 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp @@ -29,21 +29,153 @@ #include "mongo/platform/basic.h" +#include "mongo/db/pipeline/document_source_add_fields.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_source_set_window_fields.h" #include "mongo/db/pipeline/document_source_set_window_fields_gen.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/query/query_feature_flags_gen.h" +using boost::intrusive_ptr; +using boost::optional; +using std::list; + namespace mongo { REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( setWindowFields, LiteParsedDocumentSourceDefault::parse, - DocumentSourceSetWindowFields::createFromBson, + document_source_set_window_fields::createFromBson, + boost::none, + ::mongo::feature_flags::gFeatureFlagWindowFunctions.isEnabledAndIgnoreFCV()); + +REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( + _internalSetWindowFields, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceInternalSetWindowFields::createFromBson, boost::none, ::mongo::feature_flags::gFeatureFlagWindowFunctions.isEnabledAndIgnoreFCV()); -Value DocumentSourceSetWindowFields::serialize( +list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "the " << kStageName + << " stage specification must be an object, found " + << typeName(elem.type()), + elem.type() == BSONType::Object); + + auto spec = + SetWindowFieldsSpec::parse(IDLParserErrorContext(kStageName), elem.embeddedObject()); + auto partitionBy = [&]() -> boost::optional<boost::intrusive_ptr<Expression>> { + if (auto partitionBy = spec.getPartitionBy()) + return Expression::parseOperand( + expCtx.get(), partitionBy->getElement(), expCtx->variablesParseState); + else + return boost::none; + }(); + return create(std::move(expCtx), + std::move(partitionBy), + std::move(spec.getSortBy()), + std::move(spec.getOutput())); +} + +list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::create( + const intrusive_ptr<ExpressionContext>& expCtx, + optional<intrusive_ptr<Expression>> partitionBy, + optional<BSONObj> sortBy, + BSONObj fields) { + + // Starting with an input like this: + // {$setWindowFields: {partitionBy: {$foo: "$x"}, sortBy: {y: 1}, fields: {...}}} + + // We move the partitionBy expression out into its own $set stage: + // {$set: {__tmp: {$foo: "$x"}}} + // {$setWindowFields: {partitionBy: "$__tmp", sortBy: {y: 1}, fields: {...}}} + // {$unset: '__tmp'} + + // This lets us insert a $sort in between: + // {$set: {__tmp: {$foo: "$x"}}} + // {$sort: {__tmp: 1, y: 1}} + // {$setWindowFields: {partitionBy: "$__tmp", sortBy: {y: 1}, fields: {...}}} + // {$unset: '__tmp'} + + // Which lets us replace $setWindowFields with $_internalSetWindowFields: + // {$set: {__tmp: {$foo: "$x"}}} + // {$sort: {__tmp: 1, y: 1}} + // {$_internalSetWindowFields: {partitionBy: "$__tmp", sortBy: {y: 1}, fields: {...}}} + // {$unset: '__tmp'} + + // If partitionBy is a field path, we can $sort by that field directly and avoid creating a + // $set stage. This is important for pushing down the $sort. This is only valid because we + // assert (in getNextInput()) that partitionBy is never an array. + + // If there is no partitionBy at all then we just $sort by the sortBy spec. + + // If there is no sortBy and no partitionBy then we can omit the $sort stage completely. + + list<intrusive_ptr<DocumentSource>> result; + + // complexPartitionBy is an expression to evaluate. + // simplePartitionBy is a field path, which can be evaluated or sorted. + optional<intrusive_ptr<Expression>> complexPartitionBy; + optional<FieldPath> simplePartitionBy; + optional<intrusive_ptr<Expression>> simplePartitionByExpr; + // If there is no partitionBy, both are empty. + // If partitionBy is already a field path, we only fill in simplePartitionBy. + // If partitionBy is a more complex expression, we will need to generate a $set stage, + // which will bind the value of the expression to the name in simplePartitionBy. + if (partitionBy) { + auto exprFieldPath = dynamic_cast<ExpressionFieldPath*>(partitionBy->get()); + if (exprFieldPath && exprFieldPath->isRootFieldPath()) { + // ExpressionFieldPath has "CURRENT" as an explicit first component, + // but for $sort we don't want that. + simplePartitionBy = exprFieldPath->getFieldPath().tail(); + simplePartitionByExpr = partitionBy; + } else { + // In DocumentSource we don't have a mechanism for generating non-colliding field names, + // so we have to choose the tmp name carefully to make a collision unlikely in practice. + auto tmp = "__internal_setWindowFields_partition_key"; + simplePartitionBy = FieldPath{tmp}; + simplePartitionByExpr = ExpressionFieldPath::createPathFromString( + expCtx.get(), tmp, expCtx->variablesParseState); + complexPartitionBy = partitionBy; + } + } + + // $set + if (complexPartitionBy) { + result.push_back( + DocumentSourceAddFields::create(*simplePartitionBy, *complexPartitionBy, expCtx)); + } + + // $sort + if (simplePartitionBy || sortBy) { + BSONObjBuilder sortSpec; + if (simplePartitionBy) { + sortSpec << simplePartitionBy->fullPath() << 1; + } + if (sortBy) { + for (auto elem : *sortBy) { + sortSpec << elem; + } + } + result.push_back(DocumentSourceSort::create(expCtx, sortSpec.obj())); + } + + // $_internalSetWindowFields + result.push_back(make_intrusive<DocumentSourceInternalSetWindowFields>( + expCtx, simplePartitionByExpr, sortBy, fields)); + + // $unset + if (complexPartitionBy) { + result.push_back(DocumentSourceProject::createUnset(*simplePartitionBy, expCtx)); + } + + return result; +} + +Value DocumentSourceInternalSetWindowFields::serialize( boost::optional<ExplainOptions::Verbosity> explain) const { MutableDocument spec; spec[SetWindowFieldsSpec::kPartitionByFieldName] = @@ -53,7 +185,7 @@ Value DocumentSourceSetWindowFields::serialize( return Value(DOC(kStageName << spec.freeze())); } -boost::intrusive_ptr<DocumentSource> DocumentSourceSetWindowFields::createFromBson( +boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(ErrorCodes::FailedToParse, str::stream() << "the " << kStageName @@ -70,12 +202,13 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSetWindowFields::createFromBs else return boost::none; }(); - return make_intrusive<DocumentSourceSetWindowFields>( + return make_intrusive<DocumentSourceInternalSetWindowFields>( expCtx, partitionBy, spec.getSortBy(), spec.getOutput()); } -DocumentSource::GetNextResult DocumentSourceSetWindowFields::doGetNext() { - return GetNextResult::makeEOF(); +DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() { + // This is a placeholder: it returns every input doc unchanged. + return pSource->getNext(); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h index 09210c3df0d..15902de2211 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields.h +++ b/src/mongo/db/pipeline/document_source_set_window_fields.h @@ -34,9 +34,26 @@ namespace mongo { -class DocumentSourceSetWindowFields final : public DocumentSource { +/** + * $setWindowFields is an alias: it desugars to some combination of projection, sorting, + * and $_internalSetWindowFields. + */ +namespace document_source_set_window_fields { +constexpr StringData kStageName = "$setWindowFields"_sd; + +std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +std::list<boost::intrusive_ptr<DocumentSource>> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<boost::intrusive_ptr<Expression>> partitionBy, + boost::optional<BSONObj> sortBy, + BSONObj fields); +} // namespace document_source_set_window_fields + +class DocumentSourceInternalSetWindowFields final : public DocumentSource { public: - static constexpr StringData kStageName = "$setWindowFields"_sd; + static constexpr StringData kStageName = "$_internalSetWindowFields"_sd; /** * Parses 'elem' into a $setWindowFields stage, or throws a AssertionException if 'elem' was an @@ -46,10 +63,11 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); - DocumentSourceSetWindowFields(const boost::intrusive_ptr<ExpressionContext>& expCtx, - boost::optional<boost::intrusive_ptr<Expression>> partitionBy, - boost::optional<BSONObj> sortBy, - BSONObj fields) + DocumentSourceInternalSetWindowFields( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<boost::intrusive_ptr<Expression>> partitionBy, + boost::optional<BSONObj> sortBy, + BSONObj fields) : DocumentSource(kStageName, expCtx), _partitionBy(partitionBy), _sortBy(std::move(sortBy)), @@ -80,6 +98,8 @@ public: DocumentSource::GetNextResult doGetNext(); private: + DocumentSource::GetNextResult getNextInput(); + boost::optional<boost::intrusive_ptr<Expression>> _partitionBy; boost::optional<BSONObj> _sortBy; BSONObj _fields; diff --git a/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp b/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp index 60a2035cb21..414e93a53c3 100644 --- a/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp +++ b/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp @@ -46,47 +46,47 @@ namespace { using DocumentSourceSetWindowFieldsTest = AggregationContextFixture; TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseInvalidArgumentTypes) { - auto spec = BSON("$setWindowFields" + auto spec = BSON("$_internalSetWindowFields" << "invalid"); ASSERT_THROWS_CODE( - DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, ErrorCodes::FailedToParse); - spec = BSON("$setWindowFields" << BSON("sortBy" - << "invalid sort spec")); + spec = BSON("$_internalSetWindowFields" << BSON("sortBy" + << "invalid sort spec")); ASSERT_THROWS_CODE( - DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, ErrorCodes::TypeMismatch); - spec = BSON("$setWindowFields" << BSON("output" - << "invalid")); + spec = BSON("$_internalSetWindowFields" << BSON("output" + << "invalid")); ASSERT_THROWS_CODE( - DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, ErrorCodes::TypeMismatch); - spec = BSON("$setWindowFields" + spec = BSON("$_internalSetWindowFields" << BSON("partitionBy" << BSON("$notAnExpression" << 1) << "output" << BSONObj())); ASSERT_THROWS_CODE( - DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, ErrorCodes::InvalidPipelineOperator); - spec = BSON("$setWindowFields" << BSON("unknown_parameter" << 1)); + spec = BSON("$_internalSetWindowFields" << BSON("unknown_parameter" << 1)); ASSERT_THROWS_CODE( - DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, 40415); } TEST_F(DocumentSourceSetWindowFieldsTest, SuccessfullyParsesAndReserializes) { auto spec = fromjson(R"( - {$setWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum: + {$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum: {input: '$pop', documents: [-10, 0]}}}}})"); auto parsedStage = - DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); + DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()); std::vector<Value> serializedArray; parsedStage->serializeToArray(serializedArray); ASSERT_BSONOBJ_EQ(serializedArray[0].getDocument().toBson(), spec); @@ -94,7 +94,7 @@ TEST_F(DocumentSourceSetWindowFieldsTest, SuccessfullyParsesAndReserializes) { TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseIfFeatureFlagDisabled) { auto spec = fromjson(R"( - {$setWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum: + {$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum: {input: '$pop', documents: [-10, 0]}}}}})"); // By default, the unit test will have the feature flag disabled. ASSERT_THROWS_CODE( diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.cpp b/src/mongo/db/pipeline/document_source_sort_by_count.cpp index 49ea2dbc02a..5efce02e3a0 100644 --- a/src/mongo/db/pipeline/document_source_sort_by_count.cpp +++ b/src/mongo/db/pipeline/document_source_sort_by_count.cpp @@ -42,9 +42,9 @@ namespace mongo { using boost::intrusive_ptr; using std::list; -REGISTER_MULTI_STAGE_ALIAS(sortByCount, - LiteParsedDocumentSourceDefault::parse, - DocumentSourceSortByCount::createFromBson); +REGISTER_DOCUMENT_SOURCE(sortByCount, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceSortByCount::createFromBson); list<intrusive_ptr<DocumentSource>> DocumentSourceSortByCount::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { |