summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2020-12-01 21:56:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-26 17:44:59 +0000
commitdc1c3a791f63dfb909ed520aadab66a4b1ce66e9 (patch)
treef981eb0e1ee4958e0663cdc1c0ee7169ec67468d
parent31b07a007b80975852f75b43ba3437782825bf23 (diff)
downloadmongo-dc1c3a791f63dfb909ed520aadab66a4b1ce66e9.tar.gz
SERVER-53397 Desugar $setWindowFields partitionBy using $sort
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation.yml1
-rw-r--r--etc/evergreen.yml28
-rw-r--r--jstests/aggregation/sources/setWindowFields/desugar.js99
-rw-r--r--jstests/aggregation/sources/setWindowFields/parse.js50
-rw-r--r--src/mongo/db/exec/add_fields_projection_executor.cpp16
-rw-r--r--src/mongo/db/exec/add_fields_projection_executor.h10
-rw-r--r--src/mongo/db/pipeline/document_source.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source.h82
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields.h10
-rw-r--r--src/mongo/db/pipeline/document_source_bucket.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_count.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp21
-rw-r--r--src/mongo/db/pipeline/document_source_project.h16
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.cpp145
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.h32
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields_test.cpp30
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count.cpp6
19 files changed, 511 insertions, 78 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation.yml b/buildscripts/resmokeconfig/suites/aggregation.yml
index 38ae5965b47..2795384eee5 100644
--- a/buildscripts/resmokeconfig/suites/aggregation.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation.yml
@@ -24,3 +24,4 @@ executor:
mongod_options:
set_parameters:
enableTestCommands: 1
+ featureFlagWindowFunctions: 1
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 6dee1e5a1be..3ed1633da08 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -9790,6 +9790,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
- &enterprise-windows-template
@@ -9886,6 +9890,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
- <<: *enterprise-windows-nopush-template
@@ -10548,6 +10556,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
tasks:
- name: compile_parallel_TG
@@ -11851,6 +11863,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
tasks:
- name: compile_all_run_unittests_TG
@@ -12853,6 +12869,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
tasks:
- name: compile_all_run_unittests_TG
@@ -13015,6 +13035,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
multiversion_platform: ubuntu1804
multiversion_edition: enterprise
@@ -13115,6 +13139,10 @@ buildvariants:
featureFlagTimeseriesCollection: true,
featureFlagShardingFullDDLSupport: true,
featureFlagShardingFullDDLSupportTimestampedVersion: true,
+ featureFlagWindowFunctions: true,
+ }"
+ --mongosSetParameters="{
+ featureFlagWindowFunctions: true,
}"
resmoke_jobs_factor: 0.3 # Avoid starting too many mongod's under {A,UB}SAN build.
hang_analyzer_dump_core: false
diff --git a/jstests/aggregation/sources/setWindowFields/desugar.js b/jstests/aggregation/sources/setWindowFields/desugar.js
new file mode 100644
index 00000000000..889ac9ebffd
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/desugar.js
@@ -0,0 +1,99 @@
+/**
+ * Test how $setWindowFields desugars.
+ *
+ * We handle partitionBy and sortBy by generating a separate $sort stage.
+ *
+ * @tags: [
+ * # We assume the pipeline is not split into a shardsPart and mergerPart.
+ * assumes_unsharded_collection,
+ * # We're testing the explain plan, not the query results, so the facet passthrough would fail.
+ * do_not_wrap_aggregations_in_facets,
+ * ]
+ */
+(function() {
+"use strict";
+
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1}))
+ .featureFlagWindowFunctions.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the window function feature flag is disabled");
+ return;
+}
+
+const coll = db[jsTestName()];
+coll.insert({});
+
+// Use .explain() to see what the stage desugars to.
+// The result is formatted as explain-output, which differs from MQL syntax in some cases:
+// for example {$sort: {a: 1}} explains as {$sort: {sortKey: {a: 1}}}.
+function desugar(stage) {
+ const result = coll.explain().aggregate([
+ // prevent stages from being absorbed into the .find() layer
+ {$_internalInhibitOptimization: {}},
+ stage,
+ ]);
+ assert.commandWorked(result);
+ assert(Array.isArray(result.stages), result);
+ // The first two stages should be the .find() cursor and the inhibit-optimization stage;
+ // the rest of the stages are what the user's 'stage' expanded to.
+ assert(result.stages[0].$cursor, result);
+ assert(result.stages[1].$_internalInhibitOptimization, result);
+ return result.stages.slice(2);
+}
+
+// Often, the desugared stages include a generated temporary name.
+// When this happens, it's always in the first stage, an $addFields.
+function extractTmp(stages) {
+ assert(stages[0].$addFields, stages);
+ const tmp = Object.keys(stages[0].$addFields)[0];
+ assert(tmp, stages);
+ return tmp;
+}
+
+// No partitionBy and no sortBy means we don't need to sort the input.
+assert.eq(desugar({$setWindowFields: {output: {}}}), [
+ {$_internalSetWindowFields: {output: {}}},
+]);
+
+// 'sortBy' becomes an explicit $sort stage.
+assert.eq(desugar({$setWindowFields: {sortBy: {ts: 1}, output: {}}}), [
+ {$sort: {sortKey: {ts: 1}}},
+ {$_internalSetWindowFields: {sortBy: {ts: 1}, output: {}}},
+]);
+
+// 'partitionBy' a field becomes an explicit $sort stage.
+assert.eq(desugar({$setWindowFields: {partitionBy: "$zip", output: {}}}), [
+ {$sort: {sortKey: {zip: 1}}},
+ {$_internalSetWindowFields: {partitionBy: "$zip", output: {}}},
+]);
+
+// 'partitionBy' an expression becomes $set + $sort + $unset.
+// Also, the _internal stage reads from the already-computed field.
+let stages = desugar({$setWindowFields: {partitionBy: {$toLower: "$country"}, output: {}}});
+let tmp = extractTmp(stages);
+assert.eq(stages, [
+ {$addFields: {[tmp]: {$toLower: ["$country"]}}},
+ {$sort: {sortKey: {[tmp]: 1}}},
+ {$_internalSetWindowFields: {partitionBy: '$' + tmp, output: {}}},
+ {$project: {[tmp]: false}},
+]);
+
+// $sort first by partitionBy, then sortBy, because we sort within each partition.
+assert.eq(
+ desugar({$setWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}}), [
+ {$sort: {sortKey: {zip: 1, ts: -1, _id: 1}}},
+ {$_internalSetWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}},
+ ]);
+
+stages = desugar({
+ $setWindowFields: {partitionBy: {$toLower: "$country"}, sortBy: {ts: -1, _id: 1}, output: {}}
+});
+tmp = extractTmp(stages);
+assert.eq(stages, [
+ {$addFields: {[tmp]: {$toLower: ["$country"]}}},
+ {$sort: {sortKey: {[tmp]: 1, ts: -1, _id: 1}}},
+ {$_internalSetWindowFields: {partitionBy: '$' + tmp, sortBy: {ts: -1, _id: 1}, output: {}}},
+ {$project: {[tmp]: false}},
+]);
+})();
diff --git a/jstests/aggregation/sources/setWindowFields/parse.js b/jstests/aggregation/sources/setWindowFields/parse.js
new file mode 100644
index 00000000000..e7470e67de4
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/parse.js
@@ -0,0 +1,50 @@
+/**
+ * Test the syntax of $setWindowFields. For example:
+ * - Which options are allowed?
+ * - When is an expression expected, vs a constant?
+ */
+(function() {
+"use strict";
+
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1}))
+ .featureFlagWindowFunctions.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the window function feature flag is disabled");
+ return;
+}
+
+const coll = db[jsTestName()];
+coll.drop();
+
+assert.commandWorked(coll.insert({}));
+
+function run(stage, extraCommandArgs = {}) {
+ return coll.runCommand(
+ Object.merge({aggregate: coll.getName(), pipeline: [stage], cursor: {}}, extraCommandArgs));
+}
+
+// Test that the stage spec must be an object.
+assert.commandFailedWithCode(run({$setWindowFields: "invalid"}), ErrorCodes.FailedToParse);
+
+// Test that the stage parameters are the correct type.
+assert.commandFailedWithCode(run({$setWindowFields: {sortBy: "invalid"}}), ErrorCodes.TypeMismatch);
+assert.commandFailedWithCode(run({$setWindowFields: {output: "invalid"}}), ErrorCodes.TypeMismatch);
+
+// Test that parsing fails for an invalid partitionBy expression.
+assert.commandFailedWithCode(
+ run({$setWindowFields: {partitionBy: {$notAnOperator: 1}, output: {}}}),
+ ErrorCodes.InvalidPipelineOperator);
+
+// Since partitionBy can be any expression, it can be a variable.
+assert.commandWorked(run({$setWindowFields: {partitionBy: "$$NOW", output: {}}}));
+assert.commandWorked(
+ run({$setWindowFields: {partitionBy: "$$myobj.a", output: {}}}, {let : {myobj: {a: 456}}}));
+
+// Test that parsing fails for unrecognized parameters.
+assert.commandFailedWithCode(run({$setWindowFields: {what_is_this: 1}}), 40415);
+
+// Test for a successful parse, ignoring the response documents.
+assert.commandWorked(
+ run({$setWindowFields: {partitionBy: "$state", sortBy: {city: 1}, output: {a: {$sum: 1}}}}));
+})();
diff --git a/src/mongo/db/exec/add_fields_projection_executor.cpp b/src/mongo/db/exec/add_fields_projection_executor.cpp
index 330232728c5..592074b4834 100644
--- a/src/mongo/db/exec/add_fields_projection_executor.cpp
+++ b/src/mongo/db/exec/add_fields_projection_executor.cpp
@@ -218,6 +218,22 @@ std::unique_ptr<AddFieldsProjectionExecutor> AddFieldsProjectionExecutor::create
return executor;
}
+std::unique_ptr<AddFieldsProjectionExecutor> AddFieldsProjectionExecutor::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const FieldPath& fieldPath,
+ const boost::intrusive_ptr<Expression>& expr) {
+
+ // This helper is only meant for creating top-level fields. Dotted field paths require
+ // thinking about implicit array traversal.
+ tassert(5339700,
+ str::stream() << "Expected a top-level field name, but got " << fieldPath.fullPath(),
+ fieldPath.getPathLength() == 1);
+
+ auto executor = std::make_unique<AddFieldsProjectionExecutor>(expCtx);
+ executor->_root->addExpressionForPath(fieldPath, expr);
+ return executor;
+}
+
void AddFieldsProjectionExecutor::parse(const BSONObj& spec) {
for (auto elem : spec) {
// The field name might be a dotted path.
diff --git a/src/mongo/db/exec/add_fields_projection_executor.h b/src/mongo/db/exec/add_fields_projection_executor.h
index fd3ac8decab..309d27b55a2 100644
--- a/src/mongo/db/exec/add_fields_projection_executor.h
+++ b/src/mongo/db/exec/add_fields_projection_executor.h
@@ -64,6 +64,16 @@ public:
static std::unique_ptr<AddFieldsProjectionExecutor> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONObj& spec);
+ /**
+ * Create a projection that binds an expression to a top-level field.
+ *
+ * 'fieldPath' must be a top-level field name (exactly one element; no dots).
+ */
+ static std::unique_ptr<AddFieldsProjectionExecutor> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const FieldPath& fieldPath,
+ const boost::intrusive_ptr<Expression>& expr);
+
TransformerType getType() const final {
return TransformerType::kComputedProjection;
}
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index d33581db906..aa0ca4885ea 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -79,7 +79,19 @@ void DocumentSource::registerParser(
it == parserMap.end());
parserMap[name] = {parser, requiredMinVersion};
}
+void DocumentSource::registerParser(
+ string name,
+ SimpleParser simpleParser,
+ boost::optional<ServerGlobalParams::FeatureCompatibility::Version> requiredMinVersion) {
+ Parser parser =
+ [simpleParser = std::move(simpleParser)](
+ BSONElement stageSpec,
+ const intrusive_ptr<ExpressionContext>& expCtx) -> list<intrusive_ptr<DocumentSource>> {
+ return {simpleParser(std::move(stageSpec), expCtx)};
+ };
+ return registerParser(std::move(name), std::move(parser), std::move(requiredMinVersion));
+}
bool DocumentSource::hasQuery() const {
return false;
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index b9c89f39bc7..e4797743036 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -70,8 +70,11 @@ class Document;
* LiteParsedDocumentSource. This is used for checks that need to happen before a full parse,
* such as checks about which namespaces are referenced by this aggregation.
*
- * 'fullParser' takes a BSONElement and an ExpressionContext and returns a fully-executable
- * DocumentSource. This will be used for optimization and execution.
+ * 'fullParser' is either a DocumentSource::SimpleParser or a DocumentSource::Parser.
+ * In both cases, it takes a BSONElement and an ExpressionContext and returns fully-executable
+ * DocumentSource(s), for optimization and execution. In the common case it's a SimpleParser,
+ * which returns a single DocumentSource; in the general case it's a Parser, which returns a whole
+ * std::list to support "multi-stage aliases" like $bucket.
*
* Stages that do not require any special pre-parse checks can use
* LiteParsedDocumentSourceDefault::parse as their 'liteParser'.
@@ -82,57 +85,55 @@ class Document;
* REGISTER_DOCUMENT_SOURCE(foo,
* LiteParsedDocumentSourceDefault::parse,
* DocumentSourceFoo::createFromBson);
+ */
+#define REGISTER_DOCUMENT_SOURCE(key, liteParser, fullParser) \
+ REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, boost::none, true)
+
+/**
+ * Like REGISTER_DOCUMENT_SOURCE, except the parser will only be enabled when FCV >= minVersion.
+ * We store minVersion in the parserMap, so that changing FCV at runtime correctly enables/disables
+ * the parser.
+ */
+#define REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION(key, liteParser, fullParser, minVersion) \
+ REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, minVersion, true)
+
+/**
+ * Like REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION, except you can also specify a condition,
+ * evaluated during startup, that decides whether to register the parser.
+ *
+ * For example, you could check a feature flag, and register the parser only when it's enabled.
*
- * If your stage is actually an alias which needs to return more than one stage (such as
- * $sortByCount), you should use the REGISTER_MULTI_STAGE_ALIAS macro instead.
+ * Note that the condition is evaluated only once, during a MONGO_INITIALIZER. Don't specify
+ * a condition that can change at runtime, such as FCV. (Feature flags are ok, because they
+ * cannot be toggled at runtime.)
+ *
+ * This is the most general REGISTER_DOCUMENT_SOURCE* macro, which all others should delegate to.
*/
#define REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, minVersion, ...) \
MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) { \
if (!__VA_ARGS__) { \
return; \
} \
- auto fullParserWrapper = [](BSONElement stageSpec, \
- const boost::intrusive_ptr<ExpressionContext>& expCtx) { \
- return std::list<boost::intrusive_ptr<DocumentSource>>{ \
- (fullParser)(stageSpec, expCtx)}; \
- }; \
LiteParsedDocumentSource::registerParser("$" #key, liteParser); \
- DocumentSource::registerParser("$" #key, fullParserWrapper, minVersion); \
+ DocumentSource::registerParser("$" #key, fullParser, minVersion); \
}
-#define REGISTER_DOCUMENT_SOURCE(key, liteParser, fullParser) \
- REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, boost::none, true)
-
+/**
+ * Like REGISTER_DOCUMENT_SOURCE, except the parser is only enabled when test-commands are enabled.
+ */
#define REGISTER_TEST_DOCUMENT_SOURCE(key, liteParser, fullParser) \
REGISTER_DOCUMENT_SOURCE_CONDITIONALLY( \
key, liteParser, fullParser, boost::none, ::mongo::getTestCommandsEnabled())
-#define REGISTER_DOCUMENT_SOURCE_WITH_MIN_VERSION(key, liteParser, fullParser, minVersion) \
- REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(key, liteParser, fullParser, minVersion, true)
-
-/**
- * Registers a multi-stage alias (such as $sortByCount) to have the single name 'key'. When a stage
- * with name '$key' is found, 'liteParser' will be used to produce a LiteParsedDocumentSource,
- * while 'fullParser' will be called to construct a vector of DocumentSources. See the comments on
- * REGISTER_DOCUMENT_SOURCE for more information.
- *
- * As an example, if your stage alias looks like {$foo: <args>} and does *not* require any special
- * pre-parse checks, you should implement a static parser like DocumentSourceFoo::createFromBson(),
- * and register it like so:
- * REGISTER_MULTI_STAGE_ALIAS(foo,
- * LiteParsedDocumentSourceDefault::parse,
- * DocumentSourceFoo::createFromBson);
- */
-#define REGISTER_MULTI_STAGE_ALIAS(key, liteParser, fullParser) \
- MONGO_INITIALIZER(addAliasToDocSourceParserMap_##key)(InitializerContext*) { \
- LiteParsedDocumentSource::registerParser("$" #key, (liteParser)); \
- DocumentSource::registerParser("$" #key, (fullParser), boost::none); \
- }
-
class DocumentSource : public RefCountable {
public:
+ // In general a parser returns a list of DocumentSources, to accomodate "multi-stage aliases"
+ // like $bucket.
using Parser = std::function<std::list<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
+ // But in the common case a parser returns only one DocumentSource.
+ using SimpleParser = std::function<boost::intrusive_ptr<DocumentSource>(
+ BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
using ChangeStreamRequirement = StageConstraints::ChangeStreamRequirement;
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
@@ -367,6 +368,17 @@ public:
std::string name,
Parser parser,
boost::optional<ServerGlobalParams::FeatureCompatibility::Version> requiredMinVersion);
+ /**
+ * Convenience wrapper for the common case, when DocumentSource::Parser returns a list of one
+ * DocumentSource.
+ *
+ * DO NOT call this method directly. Instead, use the REGISTER_DOCUMENT_SOURCE macro defined in
+ * this file.
+ */
+ static void registerParser(
+ std::string name,
+ SimpleParser simpleParser,
+ boost::optional<ServerGlobalParams::FeatureCompatibility::Version> requiredMinVersion);
/**
* Returns true if the DocumentSource has a query.
diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp
index 2e2b4a5a53d..53d012cd258 100644
--- a/src/mongo/db/pipeline/document_source_add_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_add_fields.cpp
@@ -70,6 +70,19 @@ intrusive_ptr<DocumentSource> DocumentSourceAddFields::create(
return addFields;
}
+intrusive_ptr<DocumentSource> DocumentSourceAddFields::create(
+ const FieldPath& fieldPath,
+ const intrusive_ptr<Expression>& expr,
+ const intrusive_ptr<ExpressionContext>& expCtx) {
+
+ const bool isIndependentOfAnyCollection = false;
+ return make_intrusive<DocumentSourceSingleDocumentTransformation>(
+ expCtx,
+ projection_executor::AddFieldsProjectionExecutor::create(expCtx, fieldPath, expr),
+ kStageName,
+ isIndependentOfAnyCollection);
+}
+
intrusive_ptr<DocumentSource> DocumentSourceAddFields::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
const auto specifiedName = elem.fieldNameStringData();
diff --git a/src/mongo/db/pipeline/document_source_add_fields.h b/src/mongo/db/pipeline/document_source_add_fields.h
index 5c99a3790bb..49cb228c73b 100644
--- a/src/mongo/db/pipeline/document_source_add_fields.h
+++ b/src/mongo/db/pipeline/document_source_add_fields.h
@@ -53,6 +53,16 @@ public:
StringData stageName = kStageName);
/**
+ * Create a stage that binds an expression to a top-level field.
+ *
+ * 'fieldPath' must be a top-level field name (exactly one element; no dots).
+ */
+ static boost::intrusive_ptr<DocumentSource> create(
+ const FieldPath& fieldPath,
+ const boost::intrusive_ptr<Expression>& expr,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
* Parses a $addFields stage from the user-supplied BSON.
*/
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_bucket.cpp b/src/mongo/db/pipeline/document_source_bucket.cpp
index 73386201792..d5761bf3c85 100644
--- a/src/mongo/db/pipeline/document_source_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket.cpp
@@ -40,9 +40,9 @@ using boost::intrusive_ptr;
using std::list;
using std::vector;
-REGISTER_MULTI_STAGE_ALIAS(bucket,
- LiteParsedDocumentSourceDefault::parse,
- DocumentSourceBucket::createFromBson);
+REGISTER_DOCUMENT_SOURCE(bucket,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceBucket::createFromBson);
namespace {
intrusive_ptr<ExpressionConstant> getExpressionConstant(ExpressionContext* const expCtx,
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index bad92703daa..7b80cb18d02 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -65,9 +65,9 @@ using std::vector;
// and re-parse the pipeline. To make this work, the 'transformation' stage will serialize itself
// with the original specification, and all other stages that are created during the alias expansion
// will not serialize themselves.
-REGISTER_MULTI_STAGE_ALIAS(changeStream,
- DocumentSourceChangeStream::LiteParsed::parse,
- DocumentSourceChangeStream::createFromBson);
+REGISTER_DOCUMENT_SOURCE(changeStream,
+ DocumentSourceChangeStream::LiteParsed::parse,
+ DocumentSourceChangeStream::createFromBson);
constexpr StringData DocumentSourceChangeStream::kDocumentKeyField;
constexpr StringData DocumentSourceChangeStream::kFullDocumentBeforeChangeField;
diff --git a/src/mongo/db/pipeline/document_source_count.cpp b/src/mongo/db/pipeline/document_source_count.cpp
index 80a98a5426b..1107e2e6eeb 100644
--- a/src/mongo/db/pipeline/document_source_count.cpp
+++ b/src/mongo/db/pipeline/document_source_count.cpp
@@ -43,9 +43,9 @@ using boost::intrusive_ptr;
using std::list;
using std::string;
-REGISTER_MULTI_STAGE_ALIAS(count,
- LiteParsedDocumentSourceDefault::parse,
- DocumentSourceCount::createFromBson);
+REGISTER_DOCUMENT_SOURCE(count,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceCount::createFromBson);
list<intrusive_ptr<DocumentSource>> DocumentSourceCount::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index e5f048a8e3d..6ea0e53e055 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -33,6 +33,7 @@
#include <boost/optional.hpp>
#include <boost/smart_ptr/intrusive_ptr.hpp>
+#include <memory>
#include "mongo/db/exec/projection_executor.h"
#include "mongo/db/exec/projection_executor_builder.h"
@@ -100,6 +101,26 @@ intrusive_ptr<DocumentSource> DocumentSourceProject::create(
return project;
}
+boost::intrusive_ptr<DocumentSource> DocumentSourceProject::createUnset(
+ const FieldPath& fieldPath, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+
+ // This helper is only meant for removing top-level fields. Dotted field paths require
+ // thinking about implicit array traversal.
+ tassert(5339701,
+ str::stream() << "Expected a top-level field name, but got " << fieldPath.fullPath(),
+ fieldPath.getPathLength() == 1);
+
+ projection_ast::ProjectionPathASTNode pathNode;
+ pathNode.addChild(fieldPath.fullPath(),
+ std::make_unique<projection_ast::BooleanConstantASTNode>(false));
+ auto projection = projection_ast::Projection{
+ std::move(pathNode),
+ projection_ast::ProjectType::kExclusion,
+ };
+
+ return create(std::move(projection), expCtx, kAliasNameUnset);
+}
+
intrusive_ptr<DocumentSource> DocumentSourceProject::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
if (elem.fieldNameStringData() == kStageName) {
diff --git a/src/mongo/db/pipeline/document_source_project.h b/src/mongo/db/pipeline/document_source_project.h
index 13d45282a75..9401231a56b 100644
--- a/src/mongo/db/pipeline/document_source_project.h
+++ b/src/mongo/db/pipeline/document_source_project.h
@@ -60,16 +60,24 @@ public:
BSONObj projectSpec,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
StringData specifiedName) try {
- return create(projection_ast::parse(
- expCtx, projectSpec, ProjectionPolicies::aggregateProjectionPolicies()),
- expCtx,
- specifiedName);
+
+ auto projection = projection_ast::parse(
+ expCtx, projectSpec, ProjectionPolicies::aggregateProjectionPolicies());
+ return create(projection, expCtx, specifiedName);
} catch (DBException& ex) {
ex.addContext("Invalid " + specifiedName.toString());
throw;
}
/**
+ * Create an '$unset' stage, which removes a single top-level field.
+ *
+ * 'fieldPath' must be a top-level field.
+ */
+ static boost::intrusive_ptr<DocumentSource> createUnset(
+ const FieldPath& fieldPath, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ /**
* Parses a $project stage from the user-supplied BSON.
*/
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
index a98069a6c81..f38d76b6c14 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
@@ -29,21 +29,153 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/pipeline/document_source_add_fields.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_source_set_window_fields.h"
#include "mongo/db/pipeline/document_source_set_window_fields_gen.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/query/query_feature_flags_gen.h"
+using boost::intrusive_ptr;
+using boost::optional;
+using std::list;
+
namespace mongo {
REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(
setWindowFields,
LiteParsedDocumentSourceDefault::parse,
- DocumentSourceSetWindowFields::createFromBson,
+ document_source_set_window_fields::createFromBson,
+ boost::none,
+ ::mongo::feature_flags::gFeatureFlagWindowFunctions.isEnabledAndIgnoreFCV());
+
+REGISTER_DOCUMENT_SOURCE_CONDITIONALLY(
+ _internalSetWindowFields,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceInternalSetWindowFields::createFromBson,
boost::none,
::mongo::feature_flags::gFeatureFlagWindowFunctions.isEnabledAndIgnoreFCV());
-Value DocumentSourceSetWindowFields::serialize(
+list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "the " << kStageName
+ << " stage specification must be an object, found "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Object);
+
+ auto spec =
+ SetWindowFieldsSpec::parse(IDLParserErrorContext(kStageName), elem.embeddedObject());
+ auto partitionBy = [&]() -> boost::optional<boost::intrusive_ptr<Expression>> {
+ if (auto partitionBy = spec.getPartitionBy())
+ return Expression::parseOperand(
+ expCtx.get(), partitionBy->getElement(), expCtx->variablesParseState);
+ else
+ return boost::none;
+ }();
+ return create(std::move(expCtx),
+ std::move(partitionBy),
+ std::move(spec.getSortBy()),
+ std::move(spec.getOutput()));
+}
+
+list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::create(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ optional<intrusive_ptr<Expression>> partitionBy,
+ optional<BSONObj> sortBy,
+ BSONObj fields) {
+
+ // Starting with an input like this:
+ // {$setWindowFields: {partitionBy: {$foo: "$x"}, sortBy: {y: 1}, fields: {...}}}
+
+ // We move the partitionBy expression out into its own $set stage:
+ // {$set: {__tmp: {$foo: "$x"}}}
+ // {$setWindowFields: {partitionBy: "$__tmp", sortBy: {y: 1}, fields: {...}}}
+ // {$unset: '__tmp'}
+
+ // This lets us insert a $sort in between:
+ // {$set: {__tmp: {$foo: "$x"}}}
+ // {$sort: {__tmp: 1, y: 1}}
+ // {$setWindowFields: {partitionBy: "$__tmp", sortBy: {y: 1}, fields: {...}}}
+ // {$unset: '__tmp'}
+
+ // Which lets us replace $setWindowFields with $_internalSetWindowFields:
+ // {$set: {__tmp: {$foo: "$x"}}}
+ // {$sort: {__tmp: 1, y: 1}}
+ // {$_internalSetWindowFields: {partitionBy: "$__tmp", sortBy: {y: 1}, fields: {...}}}
+ // {$unset: '__tmp'}
+
+ // If partitionBy is a field path, we can $sort by that field directly and avoid creating a
+ // $set stage. This is important for pushing down the $sort. This is only valid because we
+ // assert (in getNextInput()) that partitionBy is never an array.
+
+ // If there is no partitionBy at all then we just $sort by the sortBy spec.
+
+ // If there is no sortBy and no partitionBy then we can omit the $sort stage completely.
+
+ list<intrusive_ptr<DocumentSource>> result;
+
+ // complexPartitionBy is an expression to evaluate.
+ // simplePartitionBy is a field path, which can be evaluated or sorted.
+ optional<intrusive_ptr<Expression>> complexPartitionBy;
+ optional<FieldPath> simplePartitionBy;
+ optional<intrusive_ptr<Expression>> simplePartitionByExpr;
+ // If there is no partitionBy, both are empty.
+ // If partitionBy is already a field path, we only fill in simplePartitionBy.
+ // If partitionBy is a more complex expression, we will need to generate a $set stage,
+ // which will bind the value of the expression to the name in simplePartitionBy.
+ if (partitionBy) {
+ auto exprFieldPath = dynamic_cast<ExpressionFieldPath*>(partitionBy->get());
+ if (exprFieldPath && exprFieldPath->isRootFieldPath()) {
+ // ExpressionFieldPath has "CURRENT" as an explicit first component,
+ // but for $sort we don't want that.
+ simplePartitionBy = exprFieldPath->getFieldPath().tail();
+ simplePartitionByExpr = partitionBy;
+ } else {
+ // In DocumentSource we don't have a mechanism for generating non-colliding field names,
+ // so we have to choose the tmp name carefully to make a collision unlikely in practice.
+ auto tmp = "__internal_setWindowFields_partition_key";
+ simplePartitionBy = FieldPath{tmp};
+ simplePartitionByExpr = ExpressionFieldPath::createPathFromString(
+ expCtx.get(), tmp, expCtx->variablesParseState);
+ complexPartitionBy = partitionBy;
+ }
+ }
+
+ // $set
+ if (complexPartitionBy) {
+ result.push_back(
+ DocumentSourceAddFields::create(*simplePartitionBy, *complexPartitionBy, expCtx));
+ }
+
+ // $sort
+ if (simplePartitionBy || sortBy) {
+ BSONObjBuilder sortSpec;
+ if (simplePartitionBy) {
+ sortSpec << simplePartitionBy->fullPath() << 1;
+ }
+ if (sortBy) {
+ for (auto elem : *sortBy) {
+ sortSpec << elem;
+ }
+ }
+ result.push_back(DocumentSourceSort::create(expCtx, sortSpec.obj()));
+ }
+
+ // $_internalSetWindowFields
+ result.push_back(make_intrusive<DocumentSourceInternalSetWindowFields>(
+ expCtx, simplePartitionByExpr, sortBy, fields));
+
+ // $unset
+ if (complexPartitionBy) {
+ result.push_back(DocumentSourceProject::createUnset(*simplePartitionBy, expCtx));
+ }
+
+ return result;
+}
+
+Value DocumentSourceInternalSetWindowFields::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
MutableDocument spec;
spec[SetWindowFieldsSpec::kPartitionByFieldName] =
@@ -53,7 +185,7 @@ Value DocumentSourceSetWindowFields::serialize(
return Value(DOC(kStageName << spec.freeze()));
}
-boost::intrusive_ptr<DocumentSource> DocumentSourceSetWindowFields::createFromBson(
+boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(ErrorCodes::FailedToParse,
str::stream() << "the " << kStageName
@@ -70,12 +202,13 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceSetWindowFields::createFromBs
else
return boost::none;
}();
- return make_intrusive<DocumentSourceSetWindowFields>(
+ return make_intrusive<DocumentSourceInternalSetWindowFields>(
expCtx, partitionBy, spec.getSortBy(), spec.getOutput());
}
-DocumentSource::GetNextResult DocumentSourceSetWindowFields::doGetNext() {
- return GetNextResult::makeEOF();
+DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() {
+ // This is a placeholder: it returns every input doc unchanged.
+ return pSource->getNext();
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h
index 09210c3df0d..15902de2211 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.h
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.h
@@ -34,9 +34,26 @@
namespace mongo {
-class DocumentSourceSetWindowFields final : public DocumentSource {
+/**
+ * $setWindowFields is an alias: it desugars to some combination of projection, sorting,
+ * and $_internalSetWindowFields.
+ */
+namespace document_source_set_window_fields {
+constexpr StringData kStageName = "$setWindowFields"_sd;
+
+std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+std::list<boost::intrusive_ptr<DocumentSource>> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<boost::intrusive_ptr<Expression>> partitionBy,
+ boost::optional<BSONObj> sortBy,
+ BSONObj fields);
+} // namespace document_source_set_window_fields
+
+class DocumentSourceInternalSetWindowFields final : public DocumentSource {
public:
- static constexpr StringData kStageName = "$setWindowFields"_sd;
+ static constexpr StringData kStageName = "$_internalSetWindowFields"_sd;
/**
* Parses 'elem' into a $setWindowFields stage, or throws a AssertionException if 'elem' was an
@@ -46,10 +63,11 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- DocumentSourceSetWindowFields(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- boost::optional<boost::intrusive_ptr<Expression>> partitionBy,
- boost::optional<BSONObj> sortBy,
- BSONObj fields)
+ DocumentSourceInternalSetWindowFields(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<boost::intrusive_ptr<Expression>> partitionBy,
+ boost::optional<BSONObj> sortBy,
+ BSONObj fields)
: DocumentSource(kStageName, expCtx),
_partitionBy(partitionBy),
_sortBy(std::move(sortBy)),
@@ -80,6 +98,8 @@ public:
DocumentSource::GetNextResult doGetNext();
private:
+ DocumentSource::GetNextResult getNextInput();
+
boost::optional<boost::intrusive_ptr<Expression>> _partitionBy;
boost::optional<BSONObj> _sortBy;
BSONObj _fields;
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp b/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp
index 60a2035cb21..414e93a53c3 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields_test.cpp
@@ -46,47 +46,47 @@ namespace {
using DocumentSourceSetWindowFieldsTest = AggregationContextFixture;
TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseInvalidArgumentTypes) {
- auto spec = BSON("$setWindowFields"
+ auto spec = BSON("$_internalSetWindowFields"
<< "invalid");
ASSERT_THROWS_CODE(
- DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
+ DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
ErrorCodes::FailedToParse);
- spec = BSON("$setWindowFields" << BSON("sortBy"
- << "invalid sort spec"));
+ spec = BSON("$_internalSetWindowFields" << BSON("sortBy"
+ << "invalid sort spec"));
ASSERT_THROWS_CODE(
- DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
+ DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
ErrorCodes::TypeMismatch);
- spec = BSON("$setWindowFields" << BSON("output"
- << "invalid"));
+ spec = BSON("$_internalSetWindowFields" << BSON("output"
+ << "invalid"));
ASSERT_THROWS_CODE(
- DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
+ DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
ErrorCodes::TypeMismatch);
- spec = BSON("$setWindowFields"
+ spec = BSON("$_internalSetWindowFields"
<< BSON("partitionBy" << BSON("$notAnExpression" << 1) << "output" << BSONObj()));
ASSERT_THROWS_CODE(
- DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
+ DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
ErrorCodes::InvalidPipelineOperator);
- spec = BSON("$setWindowFields" << BSON("unknown_parameter" << 1));
+ spec = BSON("$_internalSetWindowFields" << BSON("unknown_parameter" << 1));
ASSERT_THROWS_CODE(
- DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
+ DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
40415);
}
TEST_F(DocumentSourceSetWindowFieldsTest, SuccessfullyParsesAndReserializes) {
auto spec = fromjson(R"(
- {$setWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum:
+ {$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum:
{input: '$pop', documents: [-10, 0]}}}}})");
auto parsedStage =
- DocumentSourceSetWindowFields::createFromBson(spec.firstElement(), getExpCtx());
+ DocumentSourceInternalSetWindowFields::createFromBson(spec.firstElement(), getExpCtx());
std::vector<Value> serializedArray;
parsedStage->serializeToArray(serializedArray);
ASSERT_BSONOBJ_EQ(serializedArray[0].getDocument().toBson(), spec);
@@ -94,7 +94,7 @@ TEST_F(DocumentSourceSetWindowFieldsTest, SuccessfullyParsesAndReserializes) {
TEST_F(DocumentSourceSetWindowFieldsTest, FailsToParseIfFeatureFlagDisabled) {
auto spec = fromjson(R"(
- {$setWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum:
+ {$_internalSetWindowFields: {partitionBy: '$state', sortBy: {city: 1}, output: {mySum: {$sum:
{input: '$pop', documents: [-10, 0]}}}}})");
// By default, the unit test will have the feature flag disabled.
ASSERT_THROWS_CODE(
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.cpp b/src/mongo/db/pipeline/document_source_sort_by_count.cpp
index 49ea2dbc02a..5efce02e3a0 100644
--- a/src/mongo/db/pipeline/document_source_sort_by_count.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_by_count.cpp
@@ -42,9 +42,9 @@ namespace mongo {
using boost::intrusive_ptr;
using std::list;
-REGISTER_MULTI_STAGE_ALIAS(sortByCount,
- LiteParsedDocumentSourceDefault::parse,
- DocumentSourceSortByCount::createFromBson);
+REGISTER_DOCUMENT_SOURCE(sortByCount,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceSortByCount::createFromBson);
list<intrusive_ptr<DocumentSource>> DocumentSourceSortByCount::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {