summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2021-02-09 07:50:51 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-09 13:20:59 +0000
commitea12e9bed79e735518f20541ae765ff8b693517b (patch)
tree45f1137507802466885299990bf5bc2137c25403
parent4a2571bb8ef9b0844f61dc1c780e88c555172458 (diff)
downloadmongo-ea12e9bed79e735518f20541ae765ff8b693517b.tar.gz
SERVER-53979 Add sum accumulator to window function stage
-rw-r--r--jstests/aggregation/sources/setWindowFields/desugar.js55
-rw-r--r--jstests/aggregation/sources/setWindowFields/parse.js118
-rw-r--r--jstests/aggregation/sources/setWindowFields/sum.js148
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.cpp62
-rw-r--r--src/mongo/db/pipeline/document_source_set_window_fields.h21
-rw-r--r--src/mongo/db/pipeline/window_function_expression.cpp5
-rw-r--r--src/mongo/db/pipeline/window_function_expression.h46
7 files changed, 365 insertions, 90 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/desugar.js b/jstests/aggregation/sources/setWindowFields/desugar.js
index efd46c8bdc4..507087d2808 100644
--- a/jstests/aggregation/sources/setWindowFields/desugar.js
+++ b/jstests/aggregation/sources/setWindowFields/desugar.js
@@ -62,38 +62,39 @@ assert.eq(desugar({$setWindowFields: {sortBy: {ts: 1}, output: {}}}), [
{$_internalSetWindowFields: {sortBy: {ts: 1}, output: {}}},
]);
+// TODO SERVER-53402 Enable partitionBy tests.
// 'partitionBy' a field becomes an explicit $sort stage.
-assert.eq(desugar({$setWindowFields: {partitionBy: "$zip", output: {}}}), [
- {$sort: {sortKey: {zip: 1}}},
- {$_internalSetWindowFields: {partitionBy: "$zip", output: {}}},
-]);
+// assert.eq(desugar({$setWindowFields: {partitionBy: "$zip", output: {}}}), [
+// {$sort: {sortKey: {zip: 1}}},
+// {$_internalSetWindowFields: {partitionBy: "$zip", output: {}}},
+// ]);
// 'partitionBy' an expression becomes $set + $sort + $unset.
// Also, the _internal stage reads from the already-computed field.
-let stages = desugar({$setWindowFields: {partitionBy: {$toLower: "$country"}, output: {}}});
-let tmp = extractTmp(stages);
-assert.eq(stages, [
- {$addFields: {[tmp]: {$toLower: ["$country"]}}},
- {$sort: {sortKey: {[tmp]: 1}}},
- {$_internalSetWindowFields: {partitionBy: '$' + tmp, output: {}}},
- {$project: {[tmp]: false, _id: true}},
-]);
+// let stages = desugar({$setWindowFields: {partitionBy: {$toLower: "$country"}, output: {}}});
+// let tmp = extractTmp(stages);
+// assert.eq(stages, [
+// {$addFields: {[tmp]: {$toLower: ["$country"]}}},
+// {$sort: {sortKey: {[tmp]: 1}}},
+// {$_internalSetWindowFields: {partitionBy: '$' + tmp, output: {}}},
+// {$project: {[tmp]: false, _id: true}},
+// ]);
// $sort first by partitionBy, then sortBy, because we sort within each partition.
-assert.eq(
- desugar({$setWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}}), [
- {$sort: {sortKey: {zip: 1, ts: -1, _id: 1}}},
- {$_internalSetWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}},
- ]);
+// assert.eq(
+// desugar({$setWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}}), [
+// {$sort: {sortKey: {zip: 1, ts: -1, _id: 1}}},
+// {$_internalSetWindowFields: {partitionBy: "$zip", sortBy: {ts: -1, _id: 1}, output: {}}},
+// ]);
-stages = desugar({
- $setWindowFields: {partitionBy: {$toLower: "$country"}, sortBy: {ts: -1, _id: 1}, output: {}}
-});
-tmp = extractTmp(stages);
-assert.eq(stages, [
- {$addFields: {[tmp]: {$toLower: ["$country"]}}},
- {$sort: {sortKey: {[tmp]: 1, ts: -1, _id: 1}}},
- {$_internalSetWindowFields: {partitionBy: '$' + tmp, sortBy: {ts: -1, _id: 1}, output: {}}},
- {$project: {[tmp]: false, _id: true}},
-]);
+// stages = desugar({
+// $setWindowFields: {partitionBy: {$toLower: "$country"}, sortBy: {ts: -1, _id: 1}, output: {}}
+// });
+// tmp = extractTmp(stages);
+// assert.eq(stages, [
+// {$addFields: {[tmp]: {$toLower: ["$country"]}}},
+// {$sort: {sortKey: {[tmp]: 1, ts: -1, _id: 1}}},
+// {$_internalSetWindowFields: {partitionBy: '$' + tmp, sortBy: {ts: -1, _id: 1}, output: {}}},
+// {$project: {[tmp]: false, _id: true}},
+// ]);
})();
diff --git a/jstests/aggregation/sources/setWindowFields/parse.js b/jstests/aggregation/sources/setWindowFields/parse.js
index bdb024d2a6f..4ad8aa90756 100644
--- a/jstests/aggregation/sources/setWindowFields/parse.js
+++ b/jstests/aggregation/sources/setWindowFields/parse.js
@@ -33,23 +33,27 @@ assert.commandFailedWithCode(run({$setWindowFields: "invalid"}), ErrorCodes.Fail
assert.commandFailedWithCode(run({$setWindowFields: {sortBy: "invalid"}}), ErrorCodes.TypeMismatch);
assert.commandFailedWithCode(run({$setWindowFields: {output: "invalid"}}), ErrorCodes.TypeMismatch);
+// TODO SERVER-53402 Enable partitionBy tests.
// Test that parsing fails for an invalid partitionBy expression.
-assert.commandFailedWithCode(
- run({$setWindowFields: {partitionBy: {$notAnOperator: 1}, output: {}}}),
- ErrorCodes.InvalidPipelineOperator);
+// assert.commandFailedWithCode(
+// run({$setWindowFields: {partitionBy: {$notAnOperator: 1}, output: {}}}),
+// ErrorCodes.InvalidPipelineOperator);
// Since partitionBy can be any expression, it can be a variable.
-assert.commandWorked(run({$setWindowFields: {partitionBy: "$$NOW", output: {}}}));
-assert.commandWorked(
- run({$setWindowFields: {partitionBy: "$$myobj.a", output: {}}}, {let : {myobj: {a: 456}}}));
+// assert.commandWorked(run({$setWindowFields: {partitionBy: "$$NOW", output: {}}}));
+// assert.commandWorked(
+// run({$setWindowFields: {partitionBy: "$$myobj.a", output: {}}}, {let : {myobj: {a: 456}}}));
// Test that parsing fails for unrecognized parameters.
assert.commandFailedWithCode(run({$setWindowFields: {what_is_this: 1}}), 40415);
// Test for a successful parse, ignoring the response documents.
-assert.commandWorked(run({
- $setWindowFields: {partitionBy: "$state", sortBy: {city: 1}, output: {a: {$sum: {input: 1}}}}
-}));
+// assert.commandFailedWithCode(
+// run({
+// $setWindowFields:
+// {partitionBy: "$state", sortBy: {city: 1}, output: {a: {$sum: {input: 1}}}}
+// }),
+// 5397903);
function runSum(spec) {
// Include a single-field sortBy in this helper to allow all kinds of bounds.
@@ -57,7 +61,7 @@ function runSum(spec) {
}
// The most basic case: $sum everything.
-assert.commandWorked(runSum({input: "$a"}));
+assert.commandFailedWithCode(runSum({input: "$a"}), 5397903);
// Extra arguments to a window function are rejected.
assert.commandFailedWithCode(runSum({abcde: 1}),
@@ -65,28 +69,30 @@ assert.commandFailedWithCode(runSum({abcde: 1}),
'Window function $sum found an unknown argument: abcde');
// That's equivalent to bounds of [unbounded, unbounded].
-assert.commandWorked(runSum({input: "$a", documents: ['unbounded', 'unbounded']}));
+assert.commandFailedWithCode(runSum({input: "$a", documents: ['unbounded', 'unbounded']}), 5397903);
// Bounds can be bounded, or bounded on one side.
-assert.commandWorked(runSum({input: "$a", documents: [-2, +4]}));
-assert.commandWorked(runSum({input: "$a", documents: [-3, 'unbounded']}));
-assert.commandWorked(runSum({input: "$a", documents: ['unbounded', +5]}));
+assert.commandFailedWithCode(runSum({input: "$a", documents: [-2, +4]}), 5397904);
+assert.commandFailedWithCode(runSum({input: "$a", documents: [-3, 'unbounded']}), 5397904);
+assert.commandFailedWithCode(runSum({input: "$a", documents: ['unbounded', +5]}), 5397903);
// Range-based bounds:
-assert.commandWorked(runSum({input: "$a", range: ['unbounded', 'unbounded']}));
-assert.commandWorked(runSum({input: "$a", range: [-2, +4]}));
-assert.commandWorked(runSum({input: "$a", range: [-3, 'unbounded']}));
-assert.commandWorked(runSum({input: "$a", range: ['unbounded', +5]}));
-assert.commandWorked(runSum({input: "$a", range: [NumberDecimal('1.42'), NumberLong(5)]}));
+assert.commandFailedWithCode(runSum({input: "$a", range: ['unbounded', 'unbounded']}), 5397901);
+assert.commandFailedWithCode(runSum({input: "$a", range: [-2, +4]}), 5397901);
+assert.commandFailedWithCode(runSum({input: "$a", range: [-3, 'unbounded']}), 5397901);
+assert.commandFailedWithCode(runSum({input: "$a", range: ['unbounded', +5]}), 5397901);
+assert.commandFailedWithCode(runSum({input: "$a", range: [NumberDecimal('1.42'), NumberLong(5)]}),
+ 5397901);
// Time-based bounds:
-assert.commandWorked(runSum({input: "$a", range: [-3, 'unbounded'], unit: 'hour'}));
+assert.commandFailedWithCode(runSum({input: "$a", range: [-3, 'unbounded'], unit: 'hour'}),
+ 5397902);
// Numeric bounds can be a constant expression:
let expr = {$add: [2, 2]};
-assert.commandWorked(runSum({input: "$a", documents: [expr, expr]}));
-assert.commandWorked(runSum({input: "$a", range: [expr, expr]}));
-assert.commandWorked(runSum({input: "$a", range: [expr, expr], unit: 'hour'}));
+assert.commandFailedWithCode(runSum({input: "$a", documents: [expr, expr]}), 5397904);
+assert.commandFailedWithCode(runSum({input: "$a", range: [expr, expr]}), 5397901);
+assert.commandFailedWithCode(runSum({input: "$a", range: [expr, expr], unit: 'hour'}), 5397902);
// But 'current' and 'unbounded' are not expressions: they're more like keywords.
assert.commandFailedWithCode(runSum({input: "$a", documents: [{$const: 'current'}, 3]}),
ErrorCodes.FailedToParse,
@@ -115,9 +121,12 @@ badBounds({range: [+1, 'current'], unit: 'day'});
// Any bound besides [unbounded, unbounded] requires a sort:
// - document-based
-assert.commandWorked(run({
- $setWindowFields: {output: {v: {$sum: {input: "$a", documents: ['unbounded', 'unbounded']}}}}
-}));
+assert.commandFailedWithCode(
+ run({
+ $setWindowFields:
+ {output: {v: {$sum: {input: "$a", documents: ['unbounded', 'unbounded']}}}}
+ }),
+ 5397903);
assert.commandFailedWithCode(
run({
$setWindowFields:
@@ -126,14 +135,19 @@ assert.commandFailedWithCode(
5339901,
'Document-based bounds require a sortBy');
// - range-based
-assert.commandWorked(run(
- {$setWindowFields: {output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded']}}}}}));
-assert.commandWorked(run({
- $setWindowFields: {
- sortBy: {a: 1, b: 1},
- output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded']}}}
- }
-}));
+assert.commandFailedWithCode(
+ run({
+ $setWindowFields: {output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded']}}}}
+ }),
+ 5397901);
+assert.commandFailedWithCode(
+ run({
+ $setWindowFields: {
+ sortBy: {a: 1, b: 1},
+ output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded']}}}
+ }
+ }),
+ 5397901);
assert.commandFailedWithCode(
run({$setWindowFields: {output: {v: {$sum: {input: "$a", range: ['unbounded', 'current']}}}}}),
5339902,
@@ -148,16 +162,20 @@ assert.commandFailedWithCode(
5339902,
'Range-based bounds require sortBy a single field');
// - time-based
-assert.commandWorked(run({
- $setWindowFields:
- {output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded'], unit: 'second'}}}}
-}));
-assert.commandWorked(run({
- $setWindowFields: {
- sortBy: {a: 1, b: 1},
- output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded'], unit: 'second'}}}
- }
-}));
+assert.commandFailedWithCode(
+ run({
+ $setWindowFields:
+ {output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded'], unit: 'second'}}}}
+ }),
+ 5397902);
+assert.commandFailedWithCode(
+ run({
+ $setWindowFields: {
+ sortBy: {a: 1, b: 1},
+ output: {v: {$sum: {input: "$a", range: ['unbounded', 'unbounded'], unit: 'second'}}}
+ }
+ }),
+ 5397902);
assert.commandFailedWithCode(
run({
$setWindowFields:
@@ -176,10 +194,14 @@ assert.commandFailedWithCode(
'Range-based bounds require sortBy a single field');
// Variety of accumulators:
-assert.commandWorked(run({$setWindowFields: {output: {v: {$sum: {input: "$a"}}}}}));
-assert.commandWorked(run({$setWindowFields: {output: {v: {$avg: {input: "$a"}}}}}));
-assert.commandWorked(run({$setWindowFields: {output: {v: {$max: {input: "$a"}}}}}));
-assert.commandWorked(run({$setWindowFields: {output: {v: {$min: {input: "$a"}}}}}));
+assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$sum: {input: "$a"}}}}}),
+ 5397903);
+assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$avg: {input: "$a"}}}}}),
+ 5397900);
+assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$max: {input: "$a"}}}}}),
+ 5397900);
+assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$min: {input: "$a"}}}}}),
+ 5397900);
// Not every accumulator is automatically a window function.
assert.commandFailedWithCode(run({$setWindowFields: {output: {v: {$mergeObjects: {input: "$a"}}}}}),
diff --git a/jstests/aggregation/sources/setWindowFields/sum.js b/jstests/aggregation/sources/setWindowFields/sum.js
new file mode 100644
index 00000000000..1bdc8fd904c
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/sum.js
@@ -0,0 +1,148 @@
+/*
+ * Test that $sum works as a window function.
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js"); // documentEq
+const featureEnabled =
+ assert.commandWorked(db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1}))
+ .featureFlagWindowFunctions.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the window function feature flag is disabled");
+ return;
+}
+
+const coll = db[jsTestName()];
+coll.drop();
+
+for (let i = 0; i < 10; i++) {
+ assert.commandWorked(coll.insert({one: i, two: i * 2, arr: [{first: i}, {second: 0}]}));
+}
+
+const origDocs = coll.find().sort({_id: 1});
+function verifyResults(results, valueFunction) {
+ for (let i = 0; i < results.length; i++) {
+ // Use Object.assign to make a copy instead of pass a reference.
+ const correctDoc = valueFunction(i, Object.assign({}, origDocs[i]));
+ assert(documentEq(correctDoc, results[i]),
+ "Got: " + tojson(results[i]) + "\nExpected: " + tojson(correctDoc) +
+ "\n at position " + i + "\n");
+ }
+}
+
+function firstSum(topNum) {
+ return (topNum * (topNum + 1)) / 2;
+}
+
+function secondSum(topNum) {
+ return topNum + (topNum * topNum);
+}
+
+const sortStage = {
+ $sort: {_id: 1}
+};
+
+// Test using $sum to count.
+let result = coll.aggregate([
+ sortStage,
+ {
+ $setWindowFields: {
+ sortBy: {one: 1},
+ output: {a: {$sum: {input: 1, documents: ["unbounded", "current"]}}}
+ }
+ }
+ ])
+ .toArray();
+verifyResults(result, function(num, baseObj) {
+ baseObj.a = num + 1;
+ return baseObj;
+});
+
+// Test that we can sum over one field.
+result =
+ coll.aggregate([
+ sortStage,
+ {
+ $setWindowFields: {
+ sortBy: {one: 1},
+ output: {a: {$sum: {input: "$one", documents: ["unbounded", "current"]}}}
+ }
+ }
+ ])
+ .toArray();
+verifyResults(result, function(num, baseObj) {
+ baseObj.a = firstSum(num);
+ return baseObj;
+});
+
+// Test that we can sum over two fields.
+result = coll.aggregate([
+ sortStage,
+ {
+ $setWindowFields: {
+ sortBy: {one: 1},
+ output: {
+ a: {$sum: {input: "$one", documents: ["unbounded", "current"]}},
+ b: {$sum: {input: "$two", documents: ["unbounded", "current"]}}
+ }
+ }
+ }
+ ])
+ .toArray();
+verifyResults(result, function(num, baseObj) {
+ baseObj.a = firstSum(num);
+ baseObj.b = secondSum(num);
+ return baseObj;
+});
+
+// Test that we can overwrite an existing field.
+result =
+ coll.aggregate([
+ sortStage,
+ {
+ $setWindowFields: {
+ sortBy: {one: 1},
+ output: {one: {$sum: {input: "$one", documents: ["unbounded", "current"]}}}
+ }
+ }
+ ])
+ .toArray();
+verifyResults(result, function(num, baseObj) {
+ baseObj.one = firstSum(num);
+ return baseObj;
+});
+// TODO: SERVER-54340 Enable these tests.
+// Test that we can set a sub-field in each document in an array.
+// result =
+// coll.aggregate([
+// sortStage,
+// {
+// $setWindowFields:
+// {sortBy: {one: 1}, output: {"arr.a": {$sum: {input: "$one", documents: ["unbounded",
+// "current"]}}}}
+// }
+// ])
+// .toArray();
+// verifyResults(result, function(num, baseObj) {
+// baseObj.arr[0] = {first: baseObj.arr[0].first, a: firstSum(num)}
+// baseObj.arr[1] = {second: 0, a: firstSum(num)};
+// return baseObj;
+// });
+
+// // Test that we can set a nested field.
+// result =
+// coll.aggregate([
+// sortStage,
+// {
+// $setWindowFields:
+// {sortBy: {one: 1}, output:
+// {"a.b": {$sum: {input: "$one", documents: ["unbounded", "current"]}}}}
+// }
+// ])
+// .toArray();
+// verifyResults(result, function(num, baseObj) {
+// baseObj.a = {b: firstSum(num)};
+// return baseObj;
+// });
+})();
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.cpp b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
index 497af28bb5a..b63a63803da 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/query/query_feature_flags_gen.h"
+#include "mongo/util/visit_helper.h"
using boost::intrusive_ptr;
using boost::optional;
@@ -75,6 +76,7 @@ list<intrusive_ptr<DocumentSource>> document_source_set_window_fields::createFro
else
return boost::none;
}();
+ uassert(5397906, "partitionBy field not yet supported", !partitionBy);
optional<SortPattern> sortBy;
if (auto sortSpec = spec.getSortBy()) {
@@ -259,9 +261,65 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSetWindowFields::crea
expCtx, partitionBy, sortBy, outputFields);
}
+void DocumentSourceInternalSetWindowFields::initialize() {
+ for (auto& wfs : _outputFields) {
+ uassert(5397900, "Window function must be $sum", wfs.expr->getOpName() == "$sum");
+ // TODO: SERVER-54340 Remove this check.
+ uassert(5397905,
+ "Window functions cannot set to dotted paths",
+ wfs.fieldName.find('.') == std::string::npos);
+ auto windowBounds = wfs.expr->bounds();
+ stdx::visit(
+ visit_helper::Overloaded{
+ [](const WindowBounds::DocumentBased& docBase) {
+ stdx::visit(
+ visit_helper::Overloaded{
+ [](const WindowBounds::Unbounded) { /* pass */ },
+ [](auto&& other) {
+ uasserted(5397904,
+ "Only 'unbounded' lower bound is currently supported");
+ }},
+ docBase.lower);
+ stdx::visit(
+ visit_helper::Overloaded{
+ [](const WindowBounds::Current) { /* pass */ },
+ [](auto&& other) {
+ uasserted(5397903,
+ "Only 'current' upper bound is currently supported");
+ }},
+ docBase.upper);
+ },
+ [](const WindowBounds::RangeBased& rangeBase) {
+ uasserted(5397901, "Ranged based windows not currently supported");
+ },
+ [](const WindowBounds::TimeBased& timeBase) {
+ uasserted(5397902, "Time based windows are not currently supported");
+ }},
+ windowBounds.bounds);
+ _executableOutputs.push_back(ExecutableWindowFunction(
+ wfs.fieldName, AccumulatorSum::create(pExpCtx.get()), windowBounds, wfs.expr->input()));
+ }
+ _init = true;
+}
+
DocumentSource::GetNextResult DocumentSourceInternalSetWindowFields::doGetNext() {
- // This is a placeholder: it returns every input doc unchanged.
- return pSource->getNext();
+ if (!_init) {
+ initialize();
+ }
+
+ auto curStat = pSource->getNext();
+ if (!curStat.isAdvanced()) {
+ return curStat;
+ }
+ auto curDoc = curStat.getDocument();
+ MutableDocument outDoc(curDoc);
+ for (auto& output : _executableOutputs) {
+ // Currently only support unbounded windows and run on the merging shard -- we don't need
+ // to reset accumulators, merge states, or partition into multiple groups.
+ output.accumulator->process(output.inputExpr->evaluate(curDoc, &pExpCtx->variables), false);
+ outDoc.setNestedField(output.fieldName, output.accumulator->getValue(false));
+ }
+ return outDoc.freeze();
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_set_window_fields.h b/src/mongo/db/pipeline/document_source_set_window_fields.h
index 5ee8422fecd..764c566d61d 100644
--- a/src/mongo/db/pipeline/document_source_set_window_fields.h
+++ b/src/mongo/db/pipeline/document_source_set_window_fields.h
@@ -29,8 +29,11 @@
#pragma once
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_set_window_fields_gen.h"
+#include "mongo/db/pipeline/window_bounds.h"
#include "mongo/db/pipeline/window_function_expression.h"
namespace mongo {
@@ -50,6 +53,21 @@ struct WindowFunctionStatement {
boost::optional<ExplainOptions::Verbosity> explain) const;
};
+struct ExecutableWindowFunction {
+ std::string fieldName;
+ boost::intrusive_ptr<AccumulatorState> accumulator;
+ WindowBounds bounds;
+ boost::intrusive_ptr<Expression> inputExpr;
+
+ ExecutableWindowFunction(std::string fieldName,
+ boost::intrusive_ptr<AccumulatorState> accumulator,
+ WindowBounds bounds,
+ boost::intrusive_ptr<Expression> input)
+ : fieldName(std::move(fieldName)),
+ accumulator(std::move(accumulator)),
+ bounds(std::move(bounds)),
+ inputExpr(std::move(input)) {}
+};
/**
* $setWindowFields is an alias: it desugars to some combination of projection, sorting,
@@ -116,10 +134,13 @@ public:
private:
DocumentSource::GetNextResult getNextInput();
+ void initialize();
boost::optional<boost::intrusive_ptr<Expression>> _partitionBy;
boost::optional<SortPattern> _sortBy;
std::vector<WindowFunctionStatement> _outputFields;
+ std::vector<ExecutableWindowFunction> _executableOutputs;
+ bool _init = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function_expression.cpp b/src/mongo/db/pipeline/window_function_expression.cpp
index 07282fbdd5c..8dc5609dee8 100644
--- a/src/mongo/db/pipeline/window_function_expression.cpp
+++ b/src/mongo/db/pipeline/window_function_expression.cpp
@@ -64,4 +64,9 @@ void Expression::registerParser(std::string functionName, Parser parser) {
parserMap.emplace(std::move(functionName), std::move(parser));
}
+MONGO_INITIALIZER(windowFunctionExpressionMap)(InitializerContext*) {
+ // Nothing to do. This initializer exists to tie together all the individual initializers
+ // defined by REGISTER_WINDOW_FUNCTION.
+}
+
} // namespace mongo::window_function
diff --git a/src/mongo/db/pipeline/window_function_expression.h b/src/mongo/db/pipeline/window_function_expression.h
index a79147bb9f7..46fada425e5 100644
--- a/src/mongo/db/pipeline/window_function_expression.h
+++ b/src/mongo/db/pipeline/window_function_expression.h
@@ -34,10 +34,11 @@
#include "mongo/db/pipeline/window_bounds.h"
#include "mongo/db/query/query_feature_flags_gen.h"
-#define REGISTER_WINDOW_FUNCTION(name, parser) \
- MONGO_INITIALIZER_GENERAL(addToWindowFunctionMap_##name, ("default"), ()) \
- (InitializerContext*) { \
- ::mongo::window_function::Expression::registerParser("$" #name, parser); \
+#define REGISTER_WINDOW_FUNCTION(name, parser) \
+ MONGO_INITIALIZER_GENERAL( \
+ addToWindowFunctionMap_##name, ("default"), ("windowFunctionExpressionMap")) \
+ (InitializerContext*) { \
+ ::mongo::window_function::Expression::registerParser("$" #name, parser); \
}
namespace mongo::window_function {
@@ -85,6 +86,12 @@ public:
virtual Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const = 0;
+ virtual std::string getOpName() const = 0;
+
+ virtual WindowBounds bounds() const = 0;
+
+ virtual boost::intrusive_ptr<::mongo::Expression> input() const = 0;
+
private:
static StringMap<Parser> parserMap;
};
@@ -113,25 +120,38 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
MutableDocument args;
- args["input"] = input->serialize(static_cast<bool>(explain));
- bounds.serialize(args);
+ args["input"] = _input->serialize(static_cast<bool>(explain));
+ _bounds.serialize(args);
return Value{Document{
- {accumulatorName, args.freezeToValue()},
+ {_accumulatorName, args.freezeToValue()},
}};
}
ExpressionFromAccumulator(std::string accumulatorName,
boost::intrusive_ptr<::mongo::Expression> input,
WindowBounds bounds)
- : accumulatorName(std::move(accumulatorName)),
- input(std::move(input)),
- bounds(std::move(bounds)) {}
+ : _accumulatorName(std::move(accumulatorName)),
+ _input(std::move(input)),
+ _bounds(std::move(bounds)) {}
+
+ std::string getOpName() const final {
+ return _accumulatorName;
+ }
+
+ boost::intrusive_ptr<::mongo::Expression> input() const final {
+ return _input;
+ }
+
+ WindowBounds bounds() const final {
+ return _bounds;
+ }
+
private:
- std::string accumulatorName;
- boost::intrusive_ptr<::mongo::Expression> input;
- WindowBounds bounds;
+ std::string _accumulatorName;
+ boost::intrusive_ptr<::mongo::Expression> _input;
+ WindowBounds _bounds;
};
} // namespace mongo::window_function