summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRuoxin Xu <ruoxin.xu@mongodb.com>2021-04-27 00:45:46 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-01 12:32:38 +0000
commit6ca592cd08f5050951e244db170a1b986d1dd8a8 (patch)
tree6d398101c4856e4ba13637e865e78ffcb77f47a6
parente345b88ddca4ec85dc59768647be2e531e490e04 (diff)
downloadmongo-6ca592cd08f5050951e244db170a1b986d1dd8a8.tar.gz
SERVER-55588 Add translation logic for $integral executor
-rw-r--r--jstests/aggregation/sources/setWindowFields/integral.js204
-rw-r--r--jstests/aggregation/sources/setWindowFields/memory_limit.js2
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/accumulator_for_window_functions.h19
-rw-r--r--src/mongo/db/pipeline/accumulator_integral.cpp75
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec.cpp38
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h4
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp35
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp14
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h20
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp6
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_expression.h218
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_integral.cpp11
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_integral.h7
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_integral_test.cpp13
15 files changed, 559 insertions, 108 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/integral.js b/jstests/aggregation/sources/setWindowFields/integral.js
new file mode 100644
index 00000000000..b0c71f3f9b1
--- /dev/null
+++ b/jstests/aggregation/sources/setWindowFields/integral.js
@@ -0,0 +1,204 @@
+/**
+ * Test the behavior of $integral.
+ */
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/window_function_helpers.js");
+
+const getParam = db.adminCommand({getParameter: 1, featureFlagWindowFunctions: 1});
+jsTestLog(getParam);
+const featureEnabled = assert.commandWorked(getParam).featureFlagWindowFunctions.value;
+if (!featureEnabled) {
+ jsTestLog("Skipping test because the window function feature flag is disabled");
+ return;
+}
+
+const coll = db.setWindowFields_integral;
+
+// Like most other window functions, the default window for $integral is [unbounded, unbounded].
+coll.drop();
+assert.commandWorked(coll.insert([
+ {x: 0, y: 0},
+ {x: 1, y: 42},
+ {x: 3, y: 67},
+ {x: 7, y: 99},
+ {x: 10, y: 20},
+]));
+let result = coll.runCommand({
+ aggregate: coll.getName(),
+ cursor: {},
+ pipeline: [
+ {
+ $setWindowFields: {
+ sortBy: {x: 1},
+ output: {
+ integral: {$integral: {input: "$y"}},
+ }
+ }
+ },
+ ]
+});
+assert.commandWorked(result);
+
+// $integral never compares values from separate partitions.
+coll.drop();
+assert.commandWorked(coll.insert([
+ {partitionID: 1, x: 0, y: 1},
+ {partitionID: 1, x: 1, y: 2},
+ {partitionID: 1, x: 2, y: 1},
+ {partitionID: 1, x: 3, y: 4},
+
+ {partitionID: 2, x: 0, y: 100},
+ {partitionID: 2, x: 2, y: 105},
+ {partitionID: 2, x: 4, y: 107},
+ {partitionID: 2, x: 6, y: -100},
+]));
+result = coll.aggregate([
+ {
+ $setWindowFields: {
+ partitionBy: "$partitionID",
+ sortBy: {x: 1},
+ output: {
+ integral: {$integral: {input: "$y"}, window: {documents: [-1, 0]}},
+ }
+ }
+ },
+ {$unset: "_id"},
+ ])
+ .toArray();
+assert.sameMembers(result, [
+ {partitionID: 1, x: 0, y: 1, integral: 0},
+ {partitionID: 1, x: 1, y: 2, integral: 1.5}, // (1 + 2) * (1 - 0) / 2 = 1.5
+ {partitionID: 1, x: 2, y: 1, integral: 1.5}, // (1 + 2) * (2 - 1) / 2 = 1.5
+ {partitionID: 1, x: 3, y: 4, integral: 2.5}, // (4 + 1) * (3 - 2) / 2 = 2.5
+
+ {partitionID: 2, x: 0, y: 100, integral: 0}, //
+ {partitionID: 2, x: 2, y: 105, integral: 205}, // (100 + 105) * 2 / 2 = 205
+ {partitionID: 2, x: 4, y: 107, integral: 212}, // (105 + 107) * 2 / 2 = 212
+ {partitionID: 2, x: 6, y: -100, integral: 7}, // (107 - 100) * 2 / 2 = 7
+]);
+
+// 'outputUnit' only supports 'week' and smaller.
+coll.drop();
+function explainUnit(outputUnit) {
+ return coll.runCommand({
+ explain: {
+ aggregate: coll.getName(),
+ cursor: {},
+ pipeline: [{
+ $setWindowFields: {
+ sortBy: {x: 1},
+ output: {
+ integral: {
+ $integral: {
+ input: "$y",
+ outputUnit: outputUnit,
+ },
+ window: {documents: [-1, 1]}
+ },
+ }
+ }
+ }]
+ }
+ });
+}
+assert.commandFailedWithCode(explainUnit('year'), 5490704);
+assert.commandFailedWithCode(explainUnit('quarter'), 5490704);
+assert.commandFailedWithCode(explainUnit('month'), 5490704);
+assert.commandWorked(explainUnit('week'));
+assert.commandWorked(explainUnit('day'));
+assert.commandWorked(explainUnit('hour'));
+assert.commandWorked(explainUnit('minute'));
+assert.commandWorked(explainUnit('second'));
+assert.commandWorked(explainUnit('millisecond'));
+
+// Test if 'outputUnit' is specified. Date type input is supported.
+coll.drop();
+assert.commandWorked(coll.insert([
+ {x: ISODate("2020-01-01T00:00:00.000Z"), y: 0},
+ {x: ISODate("2020-01-01T00:00:00.002Z"), y: 2},
+ {x: ISODate("2020-01-01T00:00:00.004Z"), y: 4},
+ {x: ISODate("2020-01-01T00:00:00.006Z"), y: 6},
+]));
+
+const pipelineWithOutputUnit = [
+ {
+ $setWindowFields: {
+ sortBy: {x: 1},
+ output: {
+ integral:
+ {$integral: {input: "$y", outputUnit: 'second'}, window: {documents: [-1, 1]}},
+ }
+ }
+ },
+ {$unset: "_id"},
+];
+result = coll.aggregate(pipelineWithOutputUnit).toArray();
+assert.sameMembers(result, [
+ // We should scale the result by 'millisecond/second'.
+ {x: ISODate("2020-01-01T00:00:00.000Z"), y: 0, integral: 0.002},
+ {x: ISODate("2020-01-01T00:00:00.002Z"), y: 2, integral: 0.008},
+ {x: ISODate("2020-01-01T00:00:00.004Z"), y: 4, integral: 0.016},
+ {x: ISODate("2020-01-01T00:00:00.006Z"), y: 6, integral: 0.010},
+]);
+
+const pipelineWithNoOutputUnit = [
+ {
+ $setWindowFields: {
+ sortBy: {x: 1},
+ output: {
+ integral: {$integral: {input: "$y"}, window: {documents: [-1, 1]}},
+ }
+ }
+ },
+ {$unset: "_id"},
+];
+// 'outputUnit' is only valid if the 'sortBy' values are ISODate objects.
+// Dates are only valid if 'outputUnit' is specified.
+coll.drop();
+assert.commandWorked(coll.insert([
+ {x: 0, y: 100},
+ {x: 1, y: 100},
+ {x: ISODate("2020-01-01T00:00:00.000Z"), y: 5},
+ {x: ISODate("2020-01-01T00:00:00.001Z"), y: 4},
+]));
+assert.commandFailedWithCode(db.runCommand({
+ aggregate: "setWindowFields_integral",
+ pipeline: pipelineWithOutputUnit,
+ cursor: {},
+}),
+ 5423901);
+
+assert.commandFailedWithCode(db.runCommand({
+ aggregate: "setWindowFields_integral",
+ pipeline: pipelineWithNoOutputUnit,
+ cursor: {},
+}),
+ 5423902);
+
+// Test various type of window. Only test the stability not testing the actual result.
+coll.drop();
+assert.commandWorked(coll.insert([
+ {x: 0, y: 0},
+ {x: 1, y: 42},
+ {x: 3, y: 67},
+]));
+documentBounds.forEach(function(bounds) {
+ const res = assert.commandWorked(coll.runCommand({
+ aggregate: coll.getName(),
+ cursor: {},
+ pipeline: [
+ {
+ $setWindowFields: {
+ sortBy: {x: 1},
+ output: {
+ integral: {$integral: {input: "$y"}, window: {documents: bounds}},
+ }
+ }
+ },
+ ]
+ }));
+ assert.eq(res.cursor.firstBatch.length, 3);
+});
+})();
diff --git a/jstests/aggregation/sources/setWindowFields/memory_limit.js b/jstests/aggregation/sources/setWindowFields/memory_limit.js
index f960780d06b..5d721397894 100644
--- a/jstests/aggregation/sources/setWindowFields/memory_limit.js
+++ b/jstests/aggregation/sources/setWindowFields/memory_limit.js
@@ -40,7 +40,7 @@ assert.commandFailedWithCode(coll.runCommand({
// The same query passes with a higher memory limit.
setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
"internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
- 3000);
+ 2900);
assert.commandWorked(coll.runCommand({
aggregate: coll.getName(),
pipeline: [{$setWindowFields: {sortBy: {partitionKey: 1}, output: {val: {$sum: "$_id"}}}}],
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 1333b056f75..1554ab17ab3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -121,6 +121,7 @@ env.Library(
'accumulator_covariance.cpp',
'accumulator_exp_moving_avg.cpp',
'accumulator_first.cpp',
+ 'accumulator_integral.cpp',
'accumulator_js_reduce.cpp',
'accumulator_last.cpp',
'accumulator_merge_objects.cpp',
diff --git a/src/mongo/db/pipeline/accumulator_for_window_functions.h b/src/mongo/db/pipeline/accumulator_for_window_functions.h
index e162b36f629..058d45f62f2 100644
--- a/src/mongo/db/pipeline/accumulator_for_window_functions.h
+++ b/src/mongo/db/pipeline/accumulator_for_window_functions.h
@@ -31,6 +31,7 @@
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/window_function/window_function_covariance.h"
+#include "mongo/db/pipeline/window_function/window_function_integral.h"
namespace mongo {
@@ -128,4 +129,22 @@ public:
static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx);
};
+class AccumulatorIntegral : public AccumulatorForWindowFunctions {
+public:
+ explicit AccumulatorIntegral(ExpressionContext* const expCtx,
+ boost::optional<long long> outputUnitMillis = boost::none);
+
+ void processInternal(const Value& input, bool merging) final;
+ Value getValue(bool toBeMerged) final;
+ void reset() final;
+
+ const char* getOpName() const final;
+
+ static boost::intrusive_ptr<AccumulatorState> create(
+ ExpressionContext* const expCtx, boost::optional<long long> outputUnitMillis = boost::none);
+
+private:
+ WindowFunctionIntegral _integralWF;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/accumulator_integral.cpp b/src/mongo/db/pipeline/accumulator_integral.cpp
new file mode 100644
index 00000000000..83ba68c40d1
--- /dev/null
+++ b/src/mongo/db/pipeline/accumulator_integral.cpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/accumulator_for_window_functions.h"
+
+#include "mongo/db/exec/document_value/value.h"
+#include "mongo/db/pipeline/accumulation_statement.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/window_function_expression.h"
+
+namespace mongo {
+
+REGISTER_WINDOW_FUNCTION(integral, mongo::window_function::ExpressionIntegral::parse);
+
+AccumulatorIntegral::AccumulatorIntegral(ExpressionContext* const expCtx,
+ boost::optional<long long> outputUnitMillis)
+ : AccumulatorForWindowFunctions(expCtx),
+ _integralWF(expCtx, outputUnitMillis, true /* isNonremovable */) {
+ _memUsageBytes = sizeof(*this);
+}
+
+void AccumulatorIntegral::processInternal(const Value& input, bool merging) {
+ tassert(5558800, "$integral can't be merged", !merging);
+
+ _integralWF.add(input);
+ _memUsageBytes = sizeof(*this) + _integralWF.getApproximateSize() - sizeof(_integralWF);
+}
+
+Value AccumulatorIntegral::getValue(bool toBeMerged) {
+ return _integralWF.getValue();
+}
+
+void AccumulatorIntegral::reset() {
+ _integralWF.reset();
+ _memUsageBytes = sizeof(*this);
+}
+
+boost::intrusive_ptr<AccumulatorState> AccumulatorIntegral::create(
+ ExpressionContext* const expCtx, boost::optional<long long> outputUnitMillis) {
+ return new AccumulatorIntegral(expCtx, outputUnitMillis);
+}
+
+const char* AccumulatorIntegral::getOpName() const {
+ return "$integral";
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.cpp b/src/mongo/db/pipeline/window_function/window_function_exec.cpp
index 0ce32211238..a3df28b50ea 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec.cpp
@@ -38,21 +38,52 @@ namespace mongo {
namespace {
+/**
+ * Translates the input Expression to suit certain window function's need. For example, $integral
+ * window function requires the input value to be a 2-sized Value vector containing the evaluating
+ * value of 'sortBy' and 'input' field. So we create an 'ExpressionArray' as the input Expression
+ * for Executors.
+ *
+ * Returns the 'input' in 'expr' if no extra translation is needed.
+ */
+boost::intrusive_ptr<Expression> translateInputExpression(
+ boost::intrusive_ptr<window_function::Expression> expr,
+ const boost::optional<SortPattern>& sortBy) {
+ if (!expr)
+ return nullptr;
+ if (auto integral = dynamic_cast<window_function::ExpressionIntegral*>(expr.get())) {
+ auto expCtx = integral->expCtx();
+ tassert(5558802,
+ "$integral requires a 1-field ascending sortBy",
+ sortBy && sortBy->size() == 1 && !sortBy->begin()->expression &&
+ sortBy->begin()->isAscending);
+ auto sortByExpr = ExpressionFieldPath::createPathFromString(
+ expCtx, sortBy->begin()->fieldPath->fullPath(), expCtx->variablesParseState);
+ return ExpressionArray::create(
+ expCtx, std::vector<boost::intrusive_ptr<Expression>>{sortByExpr, integral->input()});
+ }
+
+ return expr->input();
+}
+
std::unique_ptr<WindowFunctionExec> translateDocumentWindow(
PartitionIterator* iter,
boost::intrusive_ptr<window_function::Expression> expr,
+ const boost::optional<SortPattern>& sortBy,
const WindowBounds::DocumentBased& bounds) {
+ auto inputExpr = translateInputExpression(expr, sortBy);
+
return stdx::visit(
visit_helper::Overloaded{
[&](const WindowBounds::Unbounded&) -> std::unique_ptr<WindowFunctionExec> {
// A left unbounded window will always be non-removable regardless of the upper
// bound.
return std::make_unique<WindowFunctionExecNonRemovable<AccumulatorState>>(
- iter, expr->input(), expr->buildAccumulatorOnly(), bounds.upper);
+ iter, inputExpr, expr->buildAccumulatorOnly(), bounds.upper);
},
[&](const auto&) -> std::unique_ptr<WindowFunctionExec> {
return std::make_unique<WindowFunctionExecRemovableDocument>(
- iter, expr->input(), expr->buildRemovable(), bounds);
+ iter, inputExpr, expr->buildRemovable(), bounds);
}},
bounds.lower);
}
@@ -74,7 +105,6 @@ std::unique_ptr<WindowFunctionExec> translateDerivative(
iter, deriv.input(), sortExpr, deriv.bounds(), deriv.outputUnit());
}
-
} // namespace
std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
@@ -93,7 +123,7 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
return stdx::visit(
visit_helper::Overloaded{
[&](const WindowBounds::DocumentBased& docBounds) {
- return translateDocumentWindow(iter, functionStmt.expr, docBounds);
+ return translateDocumentWindow(iter, functionStmt.expr, sortBy, docBounds);
},
[&](const WindowBounds::RangeBased& rangeBounds)
-> std::unique_ptr<WindowFunctionExec> {
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h
index bb7418d66c8..5fda7ceedfd 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h
@@ -78,10 +78,10 @@ public:
return stdx::get<int>(_upperDocumentBound);
}();
- if (auto doc = (this->_iter)[upperIndex])
+ if (auto doc = (this->_iter)[upperIndex]) {
_function->process(
_input->evaluate(*doc, &_input->getExpressionContext()->variables), false);
- else {
+ } else {
// Upper bound is out of range, but may be because it's off of the end of the
// partition. For instance, for bounds [unbounded, -1] we won't be able to
// access the upper bound until the second call to getNext().
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
index d8042421f95..2263d96ff1c 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/accumulator_for_window_functions.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -53,14 +54,27 @@ public:
WindowFunctionExecNonRemovable<AccumulatorState> createForFieldPath(
std::deque<DocumentSource::GetNextResult> docs,
const std::string& inputPath,
- WindowBounds::Bound<int> upper) {
+ WindowBounds::Bound<int> upper,
+ boost::optional<std::string> sortByPath = boost::none) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
_iter = std::make_unique<PartitionIterator>(
getExpCtx().get(), _docSource.get(), boost::none, boost::none);
auto input = ExpressionFieldPath::parse(
getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
- return WindowFunctionExecNonRemovable<AccumulatorState>(
- _iter.get(), std::move(input), AccumulatorType::create(getExpCtx().get()), upper);
+ if (sortByPath) {
+ auto sortBy = ExpressionFieldPath::parse(
+ getExpCtx().get(), *sortByPath, getExpCtx()->variablesParseState);
+ return WindowFunctionExecNonRemovable<AccumulatorState>(
+ _iter.get(),
+ ExpressionArray::create(
+ getExpCtx().get(),
+ std::vector<boost::intrusive_ptr<Expression>>{sortBy, input}),
+ AccumulatorType::create(getExpCtx().get()),
+ upper);
+ } else {
+ return WindowFunctionExecNonRemovable<AccumulatorState>(
+ _iter.get(), std::move(input), AccumulatorType::create(getExpCtx().get()), upper);
+ }
}
auto advanceIterator() {
@@ -173,5 +187,20 @@ TEST_F(WindowFunctionExecNonRemovableTest, InputExpressionAllowedToCreateVariabl
ASSERT_VALUE_EQ(Value(std::vector<Value>{Value(2), Value(3)}), exec.getNext());
}
+TEST_F(WindowFunctionExecNonRemovableTest, CanReceiveSortByExpression) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"x", 1}, {"y", 0}}, Document{{"x", 3}, {"y", 2}}, Document{{"x", 5}, {"y", 4}}};
+ auto mgr = createForFieldPath<AccumulatorIntegral>(
+ docs, "$y" /* input */, 0, std::string("$x") /* sortBy */);
+ double expectedIntegral = 0;
+ ASSERT_VALUE_EQ(Value(expectedIntegral), mgr.getNext());
+ advanceIterator();
+ expectedIntegral += 2.0; // (2 + 0) * (3 - 1) / 2.0 = 2.0
+ ASSERT_VALUE_EQ(Value(expectedIntegral), mgr.getNext());
+ advanceIterator();
+ expectedIntegral += 6.0; // (4 + 2) * (5 - 3) / 2.0 = 6.0
+ ASSERT_VALUE_EQ(Value(expectedIntegral), mgr.getNext());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
index 38a61a000bd..74cd8fd584e 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
@@ -73,12 +73,7 @@ void WindowFunctionExecRemovableDocument::initialize() {
for (int i = lowerBoundForInit; !_upperBound || i <= _upperBound.get(); ++i) {
// If this is false, we're over the end of the partition.
if (auto doc = (this->_iter)[i]) {
- Value valToAdd = _sortBy
- ? Value(std::vector<Value>{
- _sortBy->evaluate(*doc, &_sortBy->getExpressionContext()->variables),
- _input->evaluate(*doc, &_input->getExpressionContext()->variables)})
- : _input->evaluate(*doc, &_input->getExpressionContext()->variables);
- addValue(valToAdd);
+ addValue(_input->evaluate(*doc, &_input->getExpressionContext()->variables));
} else {
break;
}
@@ -96,12 +91,7 @@ void WindowFunctionExecRemovableDocument::update() {
if (_upperBound) {
// If this is false, we're over the end of the partition.
if (auto doc = (this->_iter)[_upperBound.get()]) {
- Value valToAdd = _sortBy
- ? Value(std::vector<Value>{
- _sortBy->evaluate(*doc, &_sortBy->getExpressionContext()->variables),
- _input->evaluate(*doc, &_input->getExpressionContext()->variables)})
- : _input->evaluate(*doc, &_input->getExpressionContext()->variables);
- addValue(valToAdd);
+ addValue(_input->evaluate(*doc, &_input->getExpressionContext()->variables));
}
}
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
index b8564eda950..2519f8f22a8 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
@@ -54,24 +54,6 @@ public:
std::unique_ptr<WindowFunctionState> function,
WindowBounds::DocumentBased bounds);
- /**
- * Constructs a removable window function executor with the given input expression and sortBy
- * expression to be evaluated and passed the evaluation of both "input" and "sortBy" as a single
- * input Value of a 2-sized vector (Value{sortByValue, inputValue}) to the corresponding
- * WindowFunc for each document in the window.
- *
- * The "bounds" parameter is the user supplied bounds for the window.
- */
- WindowFunctionExecRemovableDocument(PartitionIterator* iter,
- boost::intrusive_ptr<Expression> input,
- boost::intrusive_ptr<Expression> sortBy,
- std::unique_ptr<WindowFunctionState> function,
- WindowBounds::DocumentBased bounds)
- : WindowFunctionExecRemovableDocument(iter, std::move(input), std::move(function), bounds) {
- _sortBy = std::move(sortBy);
- _memUsageBytes = sizeof(*this);
- }
-
void reset() final {
_function->reset();
_values = std::queue<Value>();
@@ -96,8 +78,6 @@ private:
// accumulating/removing values.
bool _initialized = false;
- boost::intrusive_ptr<Expression> _sortBy = nullptr;
-
int _lowerBound;
// Will stay boost::none if right unbounded.
boost::optional<int> _upperBound = boost::none;
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
index 4e91a9b21f3..3de88f920f1 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
@@ -80,7 +80,11 @@ public:
std::unique_ptr<WindowFunctionState> integralFunc =
std::make_unique<WindowFunctionIntegral>(getExpCtx().get());
return WindowFunctionExecRemovableDocument(
- _iter.get(), std::move(input), std::move(sortBy), std::move(integralFunc), bounds);
+ _iter.get(),
+ ExpressionArray::create(getExpCtx().get(),
+ std::vector<boost::intrusive_ptr<Expression>>{sortBy, input}),
+ std::move(integralFunc),
+ bounds);
}
auto advanceIterator() {
diff --git a/src/mongo/db/pipeline/window_function/window_function_expression.h b/src/mongo/db/pipeline/window_function/window_function_expression.h
index bb1ef7d7704..a34bc68c0d5 100644
--- a/src/mongo/db/pipeline/window_function/window_function_expression.h
+++ b/src/mongo/db/pipeline/window_function/window_function_expression.h
@@ -31,6 +31,7 @@
#include "mongo/base/initializer.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/accumulator_for_window_functions.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_set_window_fields_gen.h"
#include "mongo/db/pipeline/window_function/window_bounds.h"
@@ -397,17 +398,101 @@ protected:
boost::optional<Decimal128> _alpha;
};
-class ExpressionDerivative : public Expression {
+class ExpressionWithOutputUnit : public Expression {
public:
static constexpr StringData kArgInput = "input"_sd;
static constexpr StringData kArgOutputUnit = "outputUnit"_sd;
+ ExpressionWithOutputUnit(ExpressionContext* expCtx,
+ std::string accumulatorName,
+ boost::intrusive_ptr<::mongo::Expression> input,
+ WindowBounds bounds,
+ boost::optional<TimeUnit> outputUnit)
+ : Expression(expCtx, accumulatorName, std::move(input), std::move(bounds)),
+ _outputUnit(outputUnit) {}
+
+ boost::optional<TimeUnit> outputUnit() const {
+ return _outputUnit;
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ MutableDocument result;
+ result[_accumulatorName][kArgInput] = _input->serialize(static_cast<bool>(explain));
+ if (_outputUnit) {
+ result[_accumulatorName][kArgOutputUnit] = Value(serializeTimeUnit(*_outputUnit));
+ }
+
+ MutableDocument windowField;
+ _bounds.serialize(windowField);
+ result[kWindowArg] = windowField.freezeToValue();
+ return result.freezeToValue();
+ }
+
+protected:
+ static boost::optional<TimeUnit> parseOutputUnit(const BSONElement& arg) {
+ boost::optional<TimeUnit> outputUnit;
+ {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << kArgOutputUnit << "' must be a string, but got " << arg.type(),
+ arg.type() == String);
+ outputUnit = parseTimeUnit(arg.valueStringData());
+ switch (*outputUnit) {
+ // These larger time units vary so much, it doesn't make sense to define a
+ // fixed conversion from milliseconds. (See 'timeUnitTypicalMilliseconds'.)
+ case TimeUnit::year:
+ case TimeUnit::quarter:
+ case TimeUnit::month:
+ uasserted(5490704, "outputUnit must be 'week' or smaller");
+ // Only these time units are allowed.
+ case TimeUnit::week:
+ case TimeUnit::day:
+ case TimeUnit::hour:
+ case TimeUnit::minute:
+ case TimeUnit::second:
+ case TimeUnit::millisecond:
+ break;
+ }
+ }
+ return outputUnit;
+ }
+
+ static void validateSortBy(const boost::optional<SortPattern>& sortBy,
+ const std::string& accumulatorName) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << accumulatorName << " requires a sortBy",
+ sortBy);
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << accumulatorName << " requires a non-compound sortBy",
+ sortBy->size() == 1);
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << accumulatorName << " requires a non-expression sortBy",
+ !sortBy->begin()->expression);
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << accumulatorName << " requires an ascending sortBy",
+ sortBy->begin()->isAscending);
+ }
+
+ boost::optional<long long> convertTimeUnitToMillis(boost::optional<TimeUnit> outputUnit) const {
+ if (!outputUnit)
+ return boost::none;
+
+ auto status = timeUnitTypicalMilliseconds(*outputUnit);
+ tassert(status);
+
+ return status.getValue();
+ }
+
+ boost::optional<TimeUnit> _outputUnit;
+};
+
+class ExpressionDerivative : public ExpressionWithOutputUnit {
+public:
ExpressionDerivative(ExpressionContext* expCtx,
boost::intrusive_ptr<::mongo::Expression> input,
WindowBounds bounds,
boost::optional<TimeUnit> outputUnit)
- : Expression(expCtx, "$derivative", std::move(input), std::move(bounds)),
- _outputUnit(outputUnit) {}
+ : ExpressionWithOutputUnit(
+ expCtx, "$derivative", std::move(input), std::move(bounds), outputUnit) {}
static boost::intrusive_ptr<Expression> parse(BSONObj obj,
const boost::optional<SortPattern>& sortBy,
@@ -419,17 +504,7 @@ public:
// }
// window: {...} // optional
// }
-
- uassert(ErrorCodes::FailedToParse, "$derivative requires a sortBy", sortBy);
- uassert(ErrorCodes::FailedToParse,
- "$derivative requires a non-compound sortBy",
- sortBy->size() == 1);
- uassert(ErrorCodes::FailedToParse,
- "$derivative requires a non-expression sortBy",
- !sortBy->begin()->expression);
- uassert(ErrorCodes::FailedToParse,
- "$derivative requires an ascending sortBy",
- sortBy->begin()->isAscending);
+ validateSortBy(sortBy, "$derivative");
boost::optional<WindowBounds> bounds;
BSONElement derivativeArgs;
@@ -462,27 +537,7 @@ public:
if (argName == kArgInput) {
input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState);
} else if (argName == kArgOutputUnit) {
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "$derivative '" << kArgOutputUnit
- << "' must be a string, but got " << arg.type(),
- arg.type() == String);
- outputUnit = parseTimeUnit(arg.valueStringData());
- switch (*outputUnit) {
- // These larger time units vary so much, it doesn't make sense to define a
- // fixed conversion from milliseconds. (See 'timeUnitTypicalMilliseconds'.)
- case TimeUnit::year:
- case TimeUnit::quarter:
- case TimeUnit::month:
- uasserted(5490704, "$derivative outputUnit must be 'week' or smaller");
- // Only these time units are allowed.
- case TimeUnit::week:
- case TimeUnit::day:
- case TimeUnit::hour:
- case TimeUnit::minute:
- case TimeUnit::second:
- case TimeUnit::millisecond:
- break;
- }
+ outputUnit = parseOutputUnit(arg);
} else {
uasserted(ErrorCodes::FailedToParse,
str::stream() << "$derivative got unexpected argument: " << argName);
@@ -500,19 +555,6 @@ public:
expCtx, std::move(input), std::move(*bounds), outputUnit);
}
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- MutableDocument result;
- result[_accumulatorName][kArgInput] = _input->serialize(static_cast<bool>(explain));
- if (_outputUnit) {
- result[_accumulatorName][kArgOutputUnit] = Value(serializeTimeUnit(*_outputUnit));
- }
-
- MutableDocument windowField;
- _bounds.serialize(windowField);
- result[kWindowArg] = windowField.freezeToValue();
- return result.freezeToValue();
- }
-
boost::intrusive_ptr<AccumulatorState> buildAccumulatorOnly() const final {
MONGO_UNREACHABLE_TASSERT(5490701);
}
@@ -520,13 +562,85 @@ public:
std::unique_ptr<WindowFunctionState> buildRemovable() const final {
MONGO_UNREACHABLE_TASSERT(5490702);
}
+};
- auto outputUnit() const {
- return _outputUnit;
+class ExpressionIntegral : public ExpressionWithOutputUnit {
+public:
+ ExpressionIntegral(ExpressionContext* expCtx,
+ boost::intrusive_ptr<::mongo::Expression> input,
+ WindowBounds bounds,
+ boost::optional<TimeUnit> outputUnit)
+ : ExpressionWithOutputUnit(
+ expCtx, "$integral", std::move(input), std::move(bounds), outputUnit) {}
+
+ static boost::intrusive_ptr<Expression> parse(BSONObj obj,
+ const boost::optional<SortPattern>& sortBy,
+ ExpressionContext* expCtx) {
+ // {
+ // $integral: {
+ // input: <expr>,
+ // outputUnit: <string>, // optional
+ // }
+ // window: {...} // optional
+ // }
+ //
+ validateSortBy(sortBy, "$integral");
+
+ boost::optional<WindowBounds> bounds = boost::none;
+ BSONElement integralArgs;
+ for (const auto& arg : obj) {
+ auto argName = arg.fieldNameStringData();
+ if (argName == kWindowArg) {
+ uassert(ErrorCodes::FailedToParse,
+ "'window' field must be an object",
+ obj[kWindowArg].type() == BSONType::Object);
+ uassert(ErrorCodes::FailedToParse,
+ "There can be only one 'window' field for $integral",
+ bounds == boost::none);
+ bounds = WindowBounds::parse(arg.embeddedObject(), sortBy, expCtx);
+ } else if (argName == "$integral"_sd) {
+ integralArgs = arg;
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "$integral got unexpected argument: " << argName);
+ }
+ }
+ tassert(
+ 5558801, "$integral parser called on object with no $integral key", integralArgs.ok());
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "$integral expects an object, but got a " << integralArgs.type()
+ << ": " << integralArgs,
+ integralArgs.type() == BSONType::Object);
+
+ boost::intrusive_ptr<::mongo::Expression> input;
+ boost::optional<TimeUnit> outputUnit = boost::none;
+ for (const auto& arg : integralArgs.Obj()) {
+ auto argName = arg.fieldNameStringData();
+ if (argName == kArgInput) {
+ input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState);
+ } else if (argName == kArgOutputUnit) {
+ uassert(ErrorCodes::FailedToParse,
+ "There can be only one 'outputUnit' field for $integral",
+ outputUnit == boost::none);
+ outputUnit = parseOutputUnit(arg);
+ } else {
+ uasserted(ErrorCodes::FailedToParse,
+ str::stream() << "$integral got unexpected argument: " << argName);
+ }
+ }
+ uassert(ErrorCodes::FailedToParse, "$integral requires an 'input' expression", input);
+
+ return make_intrusive<ExpressionIntegral>(
+ expCtx, std::move(input), bounds ? *bounds : WindowBounds(), outputUnit);
}
-private:
- boost::optional<TimeUnit> _outputUnit;
+ boost::intrusive_ptr<AccumulatorState> buildAccumulatorOnly() const final {
+ return AccumulatorIntegral::create(_expCtx, convertTimeUnitToMillis(_outputUnit));
+ }
+
+ std::unique_ptr<WindowFunctionState> buildRemovable() const final {
+ return WindowFunctionIntegral::create(_expCtx, convertTimeUnitToMillis(_outputUnit));
+ }
};
} // namespace mongo::window_function
diff --git a/src/mongo/db/pipeline/window_function/window_function_integral.cpp b/src/mongo/db/pipeline/window_function/window_function_integral.cpp
index 24eea559f9c..484cb9341e6 100644
--- a/src/mongo/db/pipeline/window_function/window_function_integral.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_integral.cpp
@@ -64,11 +64,11 @@ void WindowFunctionIntegral::assertValueType(const Value& value) {
auto arr = value.getArray();
if (_outputUnitMillis) {
- tassert(5423901,
+ uassert(5423901,
"$integral with 'outputUnit' expects the sortBy field to be a Date",
arr[0].getType() == BSONType::Date);
} else {
- tassert(5423902,
+ uassert(5423902,
"$integral (with no 'outputUnit') expects the sortBy field to be numeric",
arr[0].numeric());
}
@@ -86,6 +86,13 @@ void WindowFunctionIntegral::add(Value value) {
_integral.add(integralOfTwoPointsByTrapezoidalRule(_values.back(), value));
}
+ // "WindowFunctionIntegral" could be used as a non-removable accumulator which does not need to
+ // track the values in the window because no removal will be made. 'pop_front()' whenever a new
+ // value is added to the queue so as to save memory.
+ if (!_values.empty() && isNonremovable) {
+ _memUsageBytes -= _values.front().getApproximateSize();
+ _values.pop_front();
+ }
_memUsageBytes += value.getApproximateSize();
_values.emplace_back(std::move(value));
}
diff --git a/src/mongo/db/pipeline/window_function/window_function_integral.h b/src/mongo/db/pipeline/window_function/window_function_integral.h
index 6516ba6ca16..10b970a4c5f 100644
--- a/src/mongo/db/pipeline/window_function/window_function_integral.h
+++ b/src/mongo/db/pipeline/window_function/window_function_integral.h
@@ -45,7 +45,8 @@ public:
}
explicit WindowFunctionIntegral(ExpressionContext* const expCtx,
- boost::optional<long long> outputUnitMillis = boost::none)
+ boost::optional<long long> outputUnitMillis = boost::none,
+ bool isNonremovable = false)
: WindowFunctionState(expCtx), _integral(expCtx), _outputUnitMillis(outputUnitMillis) {
_memUsageBytes = sizeof(*this);
}
@@ -61,6 +62,9 @@ public:
_values.clear();
_nanCount = 0;
_integral.reset();
+ // AccumulatorIntegral's reset() depends on the fact that WindowFunctionIntegral's reset()
+ // will set '_memUsageBytes' to sizeof(*this). If you want to reset '_memUsageBytes' to
+ // other value, please update AccumulatorIntegral's reset() as well.
_memUsageBytes = sizeof(*this);
}
@@ -91,6 +95,7 @@ private:
std::deque<Value> _values;
boost::optional<long long> _outputUnitMillis;
int _nanCount = 0;
+ bool isNonremovable = false;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp b/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp
index f670a438e6a..4f57b1564f1 100644
--- a/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/window_function/window_function_integral.h"
-#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/time_support.h"
@@ -228,9 +227,7 @@ TEST_F(WindowFunctionIntegralTest, CanHandleDateTypeWithOutputUnit) {
ASSERT_VALUE_EQ(integral.getValue(), Value(expectedIntegral));
}
-DEATH_TEST_F(WindowFunctionIntegralTest,
- DatesWithoutOutputUnitShouldFail,
- "expects the sortBy field to be numeric") {
+TEST_F(WindowFunctionIntegralTest, DatesWithoutOutputUnitShouldFail) {
const std::vector<Value> values = {
Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1000)), Value(2)})),
Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1002)), Value(4)})),
@@ -240,9 +237,7 @@ DEATH_TEST_F(WindowFunctionIntegralTest,
ASSERT_THROWS_CODE(addValuesToWindow(values), AssertionException, 5423902);
}
-DEATH_TEST_F(WindowFunctionIntegralTest,
- NumbersWithOutputUnitShouldFail,
- "expects the sortBy field to be a Date") {
+TEST_F(WindowFunctionIntegralTest, NumbersWithOutputUnitShouldFail) {
const std::vector<Value> values = {
Value(std::vector<Value>({Value(3), Value(2)})),
Value(std::vector<Value>({Value(5), Value(4)})),
@@ -256,9 +251,7 @@ DEATH_TEST_F(WindowFunctionIntegralTest,
ASSERT_THROWS_CODE(addValuesToWindow(values), AssertionException, 5423901);
}
-DEATH_TEST_F(WindowFunctionIntegralTest,
- ResetShouldNotResetOutputUnit,
- "expects the sortBy field to be a Date") {
+TEST_F(WindowFunctionIntegralTest, ResetShouldNotResetOutputUnit) {
const std::vector<Value> dateValues = {
Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1000)), Value(0)})),
Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1002)), Value(2)})),