summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2021-03-08 09:17:11 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-08 15:32:46 +0000
commit487638e326f62ae811d550270312c03b189cce20 (patch)
tree9d870ea2d00ca430ba18e7fdbbab097f86dcb19e /src/mongo/db/pipeline
parentf842de553e3c71a410e5f49fdaec3eaaaebb04a9 (diff)
downloadmongo-487638e326f62ae811d550270312c03b189cce20.tar.gz
SERVER-55031 Move window function classes to their own files
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/accumulator_min_max.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function.h238
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_add_to_set.h81
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp1
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp283
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_min_max.h85
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_push.h92
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_push_test.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_stddev.h123
13 files changed, 671 insertions, 243 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index f276e6a84b8..2ccf71168b8 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -431,6 +431,7 @@ env.CppUnitTest(
'window_function/window_function_add_to_set_test.cpp',
'window_function/window_function_exec_derivative_test.cpp',
'window_function/window_function_exec_non_removable_test.cpp',
+ 'window_function/window_function_exec_removable_test.cpp',
'window_function/window_function_min_max_test.cpp',
'window_function/window_function_push_test.cpp',
'window_function/window_function_std_dev_test.cpp',
diff --git a/src/mongo/db/pipeline/accumulator_min_max.cpp b/src/mongo/db/pipeline/accumulator_min_max.cpp
index ab91b64628e..d0d3758487d 100644
--- a/src/mongo/db/pipeline/accumulator_min_max.cpp
+++ b/src/mongo/db/pipeline/accumulator_min_max.cpp
@@ -34,8 +34,8 @@
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/accumulation_statement.h"
#include "mongo/db/pipeline/expression.h"
-#include "mongo/db/pipeline/window_function/window_function.h"
#include "mongo/db/pipeline/window_function/window_function_expression.h"
+#include "mongo/db/pipeline/window_function/window_function_min_max.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/window_function/window_function.h b/src/mongo/db/pipeline/window_function/window_function.h
index c2d0979e5b2..9acf4aa9b70 100644
--- a/src/mongo/db/pipeline/window_function/window_function.h
+++ b/src/mongo/db/pipeline/window_function/window_function.h
@@ -56,242 +56,4 @@ protected:
ExpressionContext* _expCtx;
};
-
-template <AccumulatorMinMax::Sense sense>
-class WindowFunctionMinMax : public WindowFunctionState {
-public:
- static inline const Value kDefault = Value{BSONNULL};
-
- static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) {
- return std::make_unique<WindowFunctionMinMax<sense>>(expCtx);
- }
-
- explicit WindowFunctionMinMax(ExpressionContext* const expCtx)
- : WindowFunctionState(expCtx),
- _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {}
-
- void add(Value value) final {
- _values.insert(std::move(value));
- }
-
- void remove(Value value) final {
- // std::multiset::insert is guaranteed to put the element after any equal elements
- // already in the container. So find() / erase() will remove the oldest equal element,
- // which is what we want, to satisfy "remove() undoes add() when called in FIFO order".
- auto iter = _values.find(std::move(value));
- tassert(5371400, "Can't remove from an empty WindowFunctionMinMax", iter != _values.end());
- _values.erase(iter);
- }
-
- void reset() final {
- _values.clear();
- }
-
- Value getValue() const final {
- if (_values.empty())
- return kDefault;
- switch (sense) {
- case AccumulatorMinMax::Sense::kMin:
- return *_values.begin();
- case AccumulatorMinMax::Sense::kMax:
- return *_values.rbegin();
- }
- MONGO_UNREACHABLE_TASSERT(5371401);
- }
-
-protected:
- // Holds all the values in the window, in order, with constant-time access to both ends.
- ValueMultiset _values;
-};
-using WindowFunctionMin = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMin>;
-using WindowFunctionMax = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMax>;
-
-class WindowFunctionAddToSet final : public WindowFunctionState {
-public:
- static inline const Value kDefault = Value{std::vector<Value>()};
-
- static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) {
- return std::make_unique<WindowFunctionAddToSet>(expCtx);
- }
-
- explicit WindowFunctionAddToSet(ExpressionContext* const expCtx)
- : WindowFunctionState(expCtx),
- _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {}
-
- void add(Value value) override {
- _values.insert(std::move(value));
- }
-
- /**
- * This should only remove the first/lowest element in the window.
- */
- void remove(Value value) override {
- auto iter = _values.find(std::move(value));
- tassert(
- 5423800, "Can't remove from an empty WindowFunctionAddToSet", iter != _values.end());
- _values.erase(iter);
- }
-
- void reset() override {
- _values.clear();
- }
-
- Value getValue() const override {
- std::vector<Value> output;
- if (_values.empty())
- return kDefault;
- for (auto it = _values.begin(); it != _values.end(); it = _values.upper_bound(*it)) {
- output.push_back(*it);
- }
-
- return Value(output);
- }
-
-private:
- ValueMultiset _values;
-};
-
-class WindowFunctionPush final : public WindowFunctionState {
-public:
- using ValueListConstIterator = std::list<Value>::const_iterator;
-
- static inline const Value kDefault = Value{std::vector<Value>()};
-
- static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) {
- return std::make_unique<WindowFunctionPush>(expCtx);
- }
-
- explicit WindowFunctionPush(ExpressionContext* const expCtx)
- : WindowFunctionState(expCtx),
- _values(
- _expCtx->getValueComparator().makeOrderedValueMultimap<ValueListConstIterator>()) {}
-
- void add(Value value) override {
- _list.emplace_back(std::move(value));
- auto iter = std::prev(_list.end());
- _values.insert({*iter, iter});
- }
-
- /**
- * This should only remove the first/lowest element in the window.
- */
- void remove(Value value) override {
- // The order of the key-value pairs whose keys compare equivalent is the order of insertion
- // and does not change in std::multimap. So find() / erase() will remove the oldest equal
- // element, which is what we want, to satisfy "remove() undoes add() when called in FIFO
- // order".
- auto iter = _values.find(std::move(value));
- tassert(5423801, "Can't remove from an empty WindowFunctionPush", iter != _values.end());
- // Erase the element from both '_values' and '_list'.
- _list.erase(iter->second);
- _values.erase(iter);
- }
-
- void reset() override {
- _values.clear();
- _list.clear();
- }
-
- Value getValue() const override {
- std::vector<Value> output;
- if (_values.empty())
- return kDefault;
-
- return Value{std::vector<Value>(_list.begin(), _list.end())};
- }
-
-private:
- ValueMultimap<ValueListConstIterator> _values;
- // std::list makes sure that the order of the elements in the returned array is the order of
- // insertion.
- std::list<Value> _list;
-};
-
-class WindowFunctionStdDev : public WindowFunctionState {
-protected:
- explicit WindowFunctionStdDev(ExpressionContext* const expCtx, bool isSamp)
- : WindowFunctionState(expCtx),
- _sum(AccumulatorSum::create(expCtx)),
- _m2(AccumulatorSum::create(expCtx)),
- _isSamp(isSamp),
- _count(0),
- _nonfiniteValueCount(0) {}
-
-public:
- static Value getDefault() {
- return Value(BSONNULL);
- }
-
- void add(Value value) {
- update(std::move(value), +1);
- }
-
- void remove(Value value) {
- update(std::move(value), -1);
- }
-
- Value getValue() const final {
- if (_nonfiniteValueCount > 0)
- return Value(std::numeric_limits<double>::quiet_NaN());
- const long long adjustedCount = _isSamp ? _count - 1 : _count;
- if (adjustedCount == 0)
- return getDefault();
- return Value(sqrt(_m2->getValue(false).coerceToDouble() / adjustedCount));
- }
-
- void reset() {
- _m2->reset();
- _sum->reset();
- _count = 0;
- _nonfiniteValueCount = 0;
- }
-
-private:
- void update(Value value, int quantity) {
- // quantity should be 1 if adding value, -1 if removing value
- if (!value.numeric())
- return;
- if ((value.getType() == NumberDouble && !std::isfinite(value.getDouble())) ||
- (value.getType() == NumberDecimal && !value.getDecimal().isFinite())) {
- _nonfiniteValueCount += quantity;
- _count += quantity;
- return;
- }
-
- if (_count == 0) { // Assuming we are adding value if _count == 0.
- _count++;
- _sum->process(value, false);
- return;
- } else if (_count + quantity == 0) { // Empty the window.
- reset();
- return;
- }
- double x = _count * value.coerceToDouble() - _sum->getValue(false).coerceToDouble();
- _count += quantity;
- _sum->process(Value{value.coerceToDouble() * quantity}, false);
- _m2->process(Value{x * x * quantity / (_count * (_count - quantity))}, false);
- }
-
- // Std dev cannot make use of RemovableSum because of its specific handling of non-finite
- // values. Adding a NaN or +/-inf makes the result NaN. Additionally, the consistent and
- // exclusive use of doubles in std dev calculations makes the type handling in RemovableSum
- // unnecessary.
- boost::intrusive_ptr<AccumulatorState> _sum;
- boost::intrusive_ptr<AccumulatorState> _m2;
- bool _isSamp;
- long long _count;
- int _nonfiniteValueCount;
-};
-
-class WindowFunctionStdDevPop final : public WindowFunctionStdDev {
-public:
- explicit WindowFunctionStdDevPop(ExpressionContext* const expCtx)
- : WindowFunctionStdDev(expCtx, false) {}
-};
-
-class WindowFunctionStdDevSamp final : public WindowFunctionStdDev {
-public:
- explicit WindowFunctionStdDevSamp(ExpressionContext* const expCtx)
- : WindowFunctionStdDev(expCtx, true) {}
-};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_add_to_set.h b/src/mongo/db/pipeline/window_function/window_function_add_to_set.h
new file mode 100644
index 00000000000..ee1059aefd9
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_add_to_set.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/window_function/window_function.h"
+
+namespace mongo {
+
+class WindowFunctionAddToSet final : public WindowFunctionState {
+public:
+ static inline const Value kDefault = Value{std::vector<Value>()};
+
+ static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) {
+ return std::make_unique<WindowFunctionAddToSet>(expCtx);
+ }
+
+ explicit WindowFunctionAddToSet(ExpressionContext* const expCtx)
+ : WindowFunctionState(expCtx),
+ _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {}
+
+ void add(Value value) override {
+ _values.insert(std::move(value));
+ }
+
+ /**
+ * This should only remove the first/lowest element in the window.
+ */
+ void remove(Value value) override {
+ auto iter = _values.find(std::move(value));
+ tassert(
+ 5423800, "Can't remove from an empty WindowFunctionAddToSet", iter != _values.end());
+ _values.erase(iter);
+ }
+
+ void reset() override {
+ _values.clear();
+ }
+
+ Value getValue() const override {
+ std::vector<Value> output;
+ if (_values.empty())
+ return kDefault;
+ for (auto it = _values.begin(); it != _values.end(); it = _values.upper_bound(*it)) {
+ output.push_back(*it);
+ }
+
+ return Value(output);
+ }
+
+private:
+ ValueMultiset _values;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp b/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp
index 6e4d8545240..d7a2e80dea9 100644
--- a/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp
@@ -31,7 +31,7 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/window_function/window_function.h"
+#include "mongo/db/pipeline/window_function/window_function_add_to_set.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
index 3f36d25123d..8844a70c1f9 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/window_function/window_function.h"
#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h"
#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h"
+#include "mongo/db/pipeline/window_function/window_function_min_max.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
new file mode 100644
index 00000000000..a8d4769ed74
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
@@ -0,0 +1,283 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/exec/document_value/document_value_test_util.h"
+#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/partition_iterator.h"
+#include "mongo/db/pipeline/window_function/window_bounds.h"
+#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h"
+#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h"
+#include "mongo/db/pipeline/window_function/window_function_min_max.h"
+#include "mongo/db/query/collation/collator_interface_mock.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+class WindowFunctionExecRemovableDocumentTest : public AggregationContextFixture {
+public:
+ WindowFunctionExecRemovableDocumentTest() {}
+ WindowFunctionExecRemovableDocument createForFieldPath(
+ std::deque<DocumentSource::GetNextResult> docs,
+ const std::string& inputPath,
+ WindowBounds::DocumentBased bounds) {
+ _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
+ _iter =
+ std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none);
+ auto input = ExpressionFieldPath::parse(
+ getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
+ std::unique_ptr<WindowFunctionState> maxFunc =
+ std::make_unique<WindowFunctionMax>(getExpCtx().get());
+ return WindowFunctionExecRemovableDocument(
+ _iter.get(), std::move(input), std::move(maxFunc), bounds);
+ }
+
+ auto advanceIterator() {
+ return _iter->advance();
+ }
+
+private:
+ boost::intrusive_ptr<DocumentSourceMock> _docSource;
+ std::unique_ptr<PartitionIterator> _iter;
+};
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToInteger) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(docs, "$a", WindowBounds::DocumentBased{0, 2});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+
+ mgr = createForFieldPath(
+ std::move(docs), "$a", WindowBounds::DocumentBased{WindowBounds::Current{}, 2});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToCurrent) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(
+ std::move(docs), "$a", WindowBounds::DocumentBased{-1, WindowBounds::Current{}});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToCurrent) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(
+ std::move(docs),
+ "$a",
+ WindowBounds::DocumentBased{WindowBounds::Current{}, WindowBounds::Current{}});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToInteger) {
+ const auto docsOne = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{-1, 1});
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+
+ const auto docsTwo = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ mgr = createForFieldPath(docsTwo, "$a", WindowBounds::DocumentBased{-3, -1});
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+
+ const auto docsThree = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 5}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ mgr = createForFieldPath(std::move(docsTwo), "$a", WindowBounds::DocumentBased{1, 3});
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, DefaultValueWorksAsExpected) {
+ const auto docsOne =
+ std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}}, Document{{"a", 4}}};
+ auto mgr = createForFieldPath(docsOne, "$a", WindowBounds::DocumentBased{-3, -2});
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+ mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{2, 3});
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, RightUnboundedDoesNotAddDocumentsDuringWindow) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}, Document{{"a", 17}}};
+ auto mgr = createForFieldPath(
+ std::move(docs),
+ "$a",
+ WindowBounds::DocumentBased{WindowBounds::Current{}, WindowBounds::Unbounded{}});
+ ASSERT_VALUE_EQ(Value(17), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(17), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(17), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(17), mgr.getNext());
+ advanceIterator();
+
+ const auto docsTwo = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 18}}, Document{{"a", 2}}, Document{{"a", 1}}, Document{{"a", 17}}};
+ mgr = createForFieldPath(
+ std::move(docsTwo), "$a", WindowBounds::DocumentBased{-1, WindowBounds::Unbounded{}});
+ ASSERT_VALUE_EQ(Value(18), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(18), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(17), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(17), mgr.getNext());
+ advanceIterator();
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureFirstDocumentIsNotRemovedEarly) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 4}}, Document{{"a", 1}}, Document{{"a", 2}}};
+ auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{0, 2});
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureEarlyDocumentsAreNotIncludedIncorrectly) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 6}},
+ Document{{"a", 7}},
+ Document{{"a", 2}},
+ Document{{"a", 1}},
+ Document{{"a", 0}}};
+ auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{2, 5});
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(0), mgr.getNext());
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}},
+ Document{{"a", 2}, {"key", 1}},
+ Document{{"a", 2}, {"key", 2}},
+ Document{{"a", 1}, {"key", 2}},
+ Document{{"a", 1}, {"key", 3}}};
+ auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
+ auto key = ExpressionFieldPath::createPathFromString(
+ getExpCtx().get(), "key", getExpCtx()->variablesParseState);
+ auto iter = PartitionIterator{
+ getExpCtx().get(), mock.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)};
+ auto input =
+ ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
+ CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString;
+ std::unique_ptr<WindowFunctionState> maxFunc =
+ std::make_unique<WindowFunctionMax>(getExpCtx().get());
+ auto mgr = WindowFunctionExecRemovableDocument(
+ &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{0, 0});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ iter.advance();
+ mgr.reset();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+ iter.advance();
+ mgr.reset();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+
+ const auto docsTwo = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}},
+ Document{{"a", 2}, {"key", 1}},
+ Document{{"a", 2}, {"key", 2}},
+ Document{{"a", 1}, {"key", 2}}};
+ auto mockTwo = DocumentSourceMock::createForTest(std::move(docsTwo), getExpCtx());
+ auto keyTwo = ExpressionFieldPath::createPathFromString(
+ getExpCtx().get(), "key", getExpCtx()->variablesParseState);
+ iter = PartitionIterator{
+ getExpCtx().get(), mockTwo.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)};
+ input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
+ maxFunc = std::make_unique<WindowFunctionMax>(getExpCtx().get());
+ mgr = WindowFunctionExecRemovableDocument(
+ &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{-1, 0});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ iter.advance();
+ mgr.reset();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_min_max.h b/src/mongo/db/pipeline/window_function/window_function_min_max.h
new file mode 100644
index 00000000000..97f0f1792c0
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_min_max.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/window_function/window_function.h"
+
+namespace mongo {
+
+template <AccumulatorMinMax::Sense sense>
+class WindowFunctionMinMax : public WindowFunctionState {
+public:
+ static inline const Value kDefault = Value{BSONNULL};
+
+ static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) {
+ return std::make_unique<WindowFunctionMinMax<sense>>(expCtx);
+ }
+
+ explicit WindowFunctionMinMax(ExpressionContext* const expCtx)
+ : WindowFunctionState(expCtx),
+ _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {}
+
+ void add(Value value) final {
+ _values.insert(std::move(value));
+ }
+
+ void remove(Value value) final {
+ // std::multiset::insert is guaranteed to put the element after any equal elements
+ // already in the container. So find() / erase() will remove the oldest equal element,
+ // which is what we want, to satisfy "remove() undoes add() when called in FIFO order".
+ auto iter = _values.find(std::move(value));
+ tassert(5371400, "Can't remove from an empty WindowFunctionMinMax", iter != _values.end());
+ _values.erase(iter);
+ }
+
+ void reset() final {
+ _values.clear();
+ }
+
+ Value getValue() const final {
+ if (_values.empty())
+ return kDefault;
+ switch (sense) {
+ case AccumulatorMinMax::Sense::kMin:
+ return *_values.begin();
+ case AccumulatorMinMax::Sense::kMax:
+ return *_values.rbegin();
+ }
+ MONGO_UNREACHABLE_TASSERT(5371401);
+ }
+
+protected:
+ // Holds all the values in the window, in order, with constant-time access to both ends.
+ ValueMultiset _values;
+};
+using WindowFunctionMin = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMin>;
+using WindowFunctionMax = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMax>;
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp b/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp
index b92b42ecd0f..62102ef20c0 100644
--- a/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp
@@ -31,7 +31,7 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/window_function/window_function.h"
+#include "mongo/db/pipeline/window_function/window_function_min_max.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/window_function/window_function_push.h b/src/mongo/db/pipeline/window_function/window_function_push.h
new file mode 100644
index 00000000000..80b3dae49c9
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_push.h
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/window_function/window_function.h"
+
+namespace mongo {
+
+class WindowFunctionPush final : public WindowFunctionState {
+public:
+ using ValueListConstIterator = std::list<Value>::const_iterator;
+
+ static inline const Value kDefault = Value{std::vector<Value>()};
+
+ static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) {
+ return std::make_unique<WindowFunctionPush>(expCtx);
+ }
+
+ explicit WindowFunctionPush(ExpressionContext* const expCtx)
+ : WindowFunctionState(expCtx),
+ _values(
+ _expCtx->getValueComparator().makeOrderedValueMultimap<ValueListConstIterator>()) {}
+
+ void add(Value value) override {
+ _list.emplace_back(std::move(value));
+ auto iter = std::prev(_list.end());
+ _values.insert({*iter, iter});
+ }
+
+ /**
+ * This should only remove the first/lowest element in the window.
+ */
+ void remove(Value value) override {
+ // The order of the key-value pairs whose keys compare equivalent is the order of insertion
+ // and does not change in std::multimap. So find() / erase() will remove the oldest equal
+ // element, which is what we want, to satisfy "remove() undoes add() when called in FIFO
+ // order".
+ auto iter = _values.find(std::move(value));
+ tassert(5423801, "Can't remove from an empty WindowFunctionPush", iter != _values.end());
+ // Erase the element from both '_values' and '_list'.
+ _list.erase(iter->second);
+ _values.erase(iter);
+ }
+
+ void reset() override {
+ _values.clear();
+ _list.clear();
+ }
+
+ Value getValue() const override {
+ std::vector<Value> output;
+ if (_values.empty())
+ return kDefault;
+
+ return Value{std::vector<Value>(_list.begin(), _list.end())};
+ }
+
+private:
+ ValueMultimap<ValueListConstIterator> _values;
+ // std::list makes sure that the order of the elements in the returned array is the order of
+ // insertion.
+ std::list<Value> _list;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_push_test.cpp b/src/mongo/db/pipeline/window_function/window_function_push_test.cpp
index e96e62c0f26..a7790ee22e6 100644
--- a/src/mongo/db/pipeline/window_function/window_function_push_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_push_test.cpp
@@ -31,7 +31,7 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/window_function/window_function.h"
+#include "mongo/db/pipeline/window_function/window_function_push.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp b/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp
index 42ccb284380..dbb270a4387 100644
--- a/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
-#include "mongo/db/pipeline/window_function/window_function.h"
+#include "mongo/db/pipeline/window_function/window_function_stddev.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/pipeline/window_function/window_function_stddev.h b/src/mongo/db/pipeline/window_function/window_function_stddev.h
new file mode 100644
index 00000000000..f02e1353dd8
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_stddev.h
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/window_function/window_function.h"
+
+namespace mongo {
+
+class WindowFunctionStdDev : public WindowFunctionState {
+protected:
+ explicit WindowFunctionStdDev(ExpressionContext* const expCtx, bool isSamp)
+ : WindowFunctionState(expCtx),
+ _sum(AccumulatorSum::create(expCtx)),
+ _m2(AccumulatorSum::create(expCtx)),
+ _isSamp(isSamp),
+ _count(0),
+ _nonfiniteValueCount(0) {}
+
+public:
+ static Value getDefault() {
+ return Value(BSONNULL);
+ }
+
+ void add(Value value) {
+ update(std::move(value), +1);
+ }
+
+ void remove(Value value) {
+ update(std::move(value), -1);
+ }
+
+ Value getValue() const final {
+ if (_nonfiniteValueCount > 0)
+ return Value(std::numeric_limits<double>::quiet_NaN());
+ const long long adjustedCount = _isSamp ? _count - 1 : _count;
+ if (adjustedCount == 0)
+ return getDefault();
+ return Value(sqrt(_m2->getValue(false).coerceToDouble() / adjustedCount));
+ }
+
+ void reset() {
+ _m2->reset();
+ _sum->reset();
+ _count = 0;
+ _nonfiniteValueCount = 0;
+ }
+
+private:
+ void update(Value value, int quantity) {
+ // quantity should be 1 if adding value, -1 if removing value
+ if (!value.numeric())
+ return;
+ if ((value.getType() == NumberDouble && !std::isfinite(value.getDouble())) ||
+ (value.getType() == NumberDecimal && !value.getDecimal().isFinite())) {
+ _nonfiniteValueCount += quantity;
+ _count += quantity;
+ return;
+ }
+
+ if (_count == 0) { // Assuming we are adding value if _count == 0.
+ _count++;
+ _sum->process(value, false);
+ return;
+ } else if (_count + quantity == 0) { // Empty the window.
+ reset();
+ return;
+ }
+ double x = _count * value.coerceToDouble() - _sum->getValue(false).coerceToDouble();
+ _count += quantity;
+ _sum->process(Value{value.coerceToDouble() * quantity}, false);
+ _m2->process(Value{x * x * quantity / (_count * (_count - quantity))}, false);
+ }
+
+ // Std dev cannot make use of RemovableSum because of its specific handling of non-finite
+ // values. Adding a NaN or +/-inf makes the result NaN. Additionally, the consistent and
+ // exclusive use of doubles in std dev calculations makes the type handling in RemovableSum
+ // unnecessary.
+ boost::intrusive_ptr<AccumulatorState> _sum;
+ boost::intrusive_ptr<AccumulatorState> _m2;
+ bool _isSamp;
+ long long _count;
+ int _nonfiniteValueCount;
+};
+
+class WindowFunctionStdDevPop final : public WindowFunctionStdDev {
+public:
+ explicit WindowFunctionStdDevPop(ExpressionContext* const expCtx)
+ : WindowFunctionStdDev(expCtx, false) {}
+};
+
+class WindowFunctionStdDevSamp final : public WindowFunctionStdDev {
+public:
+ explicit WindowFunctionStdDevSamp(ExpressionContext* const expCtx)
+ : WindowFunctionStdDev(expCtx, true) {}
+};
+} // namespace mongo