diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2021-03-08 09:17:11 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-08 15:32:46 +0000 |
commit | 487638e326f62ae811d550270312c03b189cce20 (patch) | |
tree | 9d870ea2d00ca430ba18e7fdbbab097f86dcb19e | |
parent | f842de553e3c71a410e5f49fdaec3eaaaebb04a9 (diff) | |
download | mongo-487638e326f62ae811d550270312c03b189cce20.tar.gz |
SERVER-55031 Move window function classes to their own files
13 files changed, 671 insertions, 243 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index f276e6a84b8..2ccf71168b8 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -431,6 +431,7 @@ env.CppUnitTest( 'window_function/window_function_add_to_set_test.cpp', 'window_function/window_function_exec_derivative_test.cpp', 'window_function/window_function_exec_non_removable_test.cpp', + 'window_function/window_function_exec_removable_test.cpp', 'window_function/window_function_min_max_test.cpp', 'window_function/window_function_push_test.cpp', 'window_function/window_function_std_dev_test.cpp', diff --git a/src/mongo/db/pipeline/accumulator_min_max.cpp b/src/mongo/db/pipeline/accumulator_min_max.cpp index ab91b64628e..d0d3758487d 100644 --- a/src/mongo/db/pipeline/accumulator_min_max.cpp +++ b/src/mongo/db/pipeline/accumulator_min_max.cpp @@ -34,8 +34,8 @@ #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/expression.h" -#include "mongo/db/pipeline/window_function/window_function.h" #include "mongo/db/pipeline/window_function/window_function_expression.h" +#include "mongo/db/pipeline/window_function/window_function_min_max.h" namespace mongo { diff --git a/src/mongo/db/pipeline/window_function/window_function.h b/src/mongo/db/pipeline/window_function/window_function.h index c2d0979e5b2..9acf4aa9b70 100644 --- a/src/mongo/db/pipeline/window_function/window_function.h +++ b/src/mongo/db/pipeline/window_function/window_function.h @@ -56,242 +56,4 @@ protected: ExpressionContext* _expCtx; }; - -template <AccumulatorMinMax::Sense sense> -class WindowFunctionMinMax : public WindowFunctionState { -public: - static inline const Value kDefault = Value{BSONNULL}; - - static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) { - return std::make_unique<WindowFunctionMinMax<sense>>(expCtx); - } - - explicit WindowFunctionMinMax(ExpressionContext* const expCtx) - : WindowFunctionState(expCtx), - _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {} - - void add(Value value) final { - _values.insert(std::move(value)); - } - - void remove(Value value) final { - // std::multiset::insert is guaranteed to put the element after any equal elements - // already in the container. So find() / erase() will remove the oldest equal element, - // which is what we want, to satisfy "remove() undoes add() when called in FIFO order". - auto iter = _values.find(std::move(value)); - tassert(5371400, "Can't remove from an empty WindowFunctionMinMax", iter != _values.end()); - _values.erase(iter); - } - - void reset() final { - _values.clear(); - } - - Value getValue() const final { - if (_values.empty()) - return kDefault; - switch (sense) { - case AccumulatorMinMax::Sense::kMin: - return *_values.begin(); - case AccumulatorMinMax::Sense::kMax: - return *_values.rbegin(); - } - MONGO_UNREACHABLE_TASSERT(5371401); - } - -protected: - // Holds all the values in the window, in order, with constant-time access to both ends. - ValueMultiset _values; -}; -using WindowFunctionMin = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMin>; -using WindowFunctionMax = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMax>; - -class WindowFunctionAddToSet final : public WindowFunctionState { -public: - static inline const Value kDefault = Value{std::vector<Value>()}; - - static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) { - return std::make_unique<WindowFunctionAddToSet>(expCtx); - } - - explicit WindowFunctionAddToSet(ExpressionContext* const expCtx) - : WindowFunctionState(expCtx), - _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {} - - void add(Value value) override { - _values.insert(std::move(value)); - } - - /** - * This should only remove the first/lowest element in the window. - */ - void remove(Value value) override { - auto iter = _values.find(std::move(value)); - tassert( - 5423800, "Can't remove from an empty WindowFunctionAddToSet", iter != _values.end()); - _values.erase(iter); - } - - void reset() override { - _values.clear(); - } - - Value getValue() const override { - std::vector<Value> output; - if (_values.empty()) - return kDefault; - for (auto it = _values.begin(); it != _values.end(); it = _values.upper_bound(*it)) { - output.push_back(*it); - } - - return Value(output); - } - -private: - ValueMultiset _values; -}; - -class WindowFunctionPush final : public WindowFunctionState { -public: - using ValueListConstIterator = std::list<Value>::const_iterator; - - static inline const Value kDefault = Value{std::vector<Value>()}; - - static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) { - return std::make_unique<WindowFunctionPush>(expCtx); - } - - explicit WindowFunctionPush(ExpressionContext* const expCtx) - : WindowFunctionState(expCtx), - _values( - _expCtx->getValueComparator().makeOrderedValueMultimap<ValueListConstIterator>()) {} - - void add(Value value) override { - _list.emplace_back(std::move(value)); - auto iter = std::prev(_list.end()); - _values.insert({*iter, iter}); - } - - /** - * This should only remove the first/lowest element in the window. - */ - void remove(Value value) override { - // The order of the key-value pairs whose keys compare equivalent is the order of insertion - // and does not change in std::multimap. So find() / erase() will remove the oldest equal - // element, which is what we want, to satisfy "remove() undoes add() when called in FIFO - // order". - auto iter = _values.find(std::move(value)); - tassert(5423801, "Can't remove from an empty WindowFunctionPush", iter != _values.end()); - // Erase the element from both '_values' and '_list'. - _list.erase(iter->second); - _values.erase(iter); - } - - void reset() override { - _values.clear(); - _list.clear(); - } - - Value getValue() const override { - std::vector<Value> output; - if (_values.empty()) - return kDefault; - - return Value{std::vector<Value>(_list.begin(), _list.end())}; - } - -private: - ValueMultimap<ValueListConstIterator> _values; - // std::list makes sure that the order of the elements in the returned array is the order of - // insertion. - std::list<Value> _list; -}; - -class WindowFunctionStdDev : public WindowFunctionState { -protected: - explicit WindowFunctionStdDev(ExpressionContext* const expCtx, bool isSamp) - : WindowFunctionState(expCtx), - _sum(AccumulatorSum::create(expCtx)), - _m2(AccumulatorSum::create(expCtx)), - _isSamp(isSamp), - _count(0), - _nonfiniteValueCount(0) {} - -public: - static Value getDefault() { - return Value(BSONNULL); - } - - void add(Value value) { - update(std::move(value), +1); - } - - void remove(Value value) { - update(std::move(value), -1); - } - - Value getValue() const final { - if (_nonfiniteValueCount > 0) - return Value(std::numeric_limits<double>::quiet_NaN()); - const long long adjustedCount = _isSamp ? _count - 1 : _count; - if (adjustedCount == 0) - return getDefault(); - return Value(sqrt(_m2->getValue(false).coerceToDouble() / adjustedCount)); - } - - void reset() { - _m2->reset(); - _sum->reset(); - _count = 0; - _nonfiniteValueCount = 0; - } - -private: - void update(Value value, int quantity) { - // quantity should be 1 if adding value, -1 if removing value - if (!value.numeric()) - return; - if ((value.getType() == NumberDouble && !std::isfinite(value.getDouble())) || - (value.getType() == NumberDecimal && !value.getDecimal().isFinite())) { - _nonfiniteValueCount += quantity; - _count += quantity; - return; - } - - if (_count == 0) { // Assuming we are adding value if _count == 0. - _count++; - _sum->process(value, false); - return; - } else if (_count + quantity == 0) { // Empty the window. - reset(); - return; - } - double x = _count * value.coerceToDouble() - _sum->getValue(false).coerceToDouble(); - _count += quantity; - _sum->process(Value{value.coerceToDouble() * quantity}, false); - _m2->process(Value{x * x * quantity / (_count * (_count - quantity))}, false); - } - - // Std dev cannot make use of RemovableSum because of its specific handling of non-finite - // values. Adding a NaN or +/-inf makes the result NaN. Additionally, the consistent and - // exclusive use of doubles in std dev calculations makes the type handling in RemovableSum - // unnecessary. - boost::intrusive_ptr<AccumulatorState> _sum; - boost::intrusive_ptr<AccumulatorState> _m2; - bool _isSamp; - long long _count; - int _nonfiniteValueCount; -}; - -class WindowFunctionStdDevPop final : public WindowFunctionStdDev { -public: - explicit WindowFunctionStdDevPop(ExpressionContext* const expCtx) - : WindowFunctionStdDev(expCtx, false) {} -}; - -class WindowFunctionStdDevSamp final : public WindowFunctionStdDev { -public: - explicit WindowFunctionStdDevSamp(ExpressionContext* const expCtx) - : WindowFunctionStdDev(expCtx, true) {} -}; } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_add_to_set.h b/src/mongo/db/pipeline/window_function/window_function_add_to_set.h new file mode 100644 index 00000000000..ee1059aefd9 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_add_to_set.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/window_function/window_function.h" + +namespace mongo { + +class WindowFunctionAddToSet final : public WindowFunctionState { +public: + static inline const Value kDefault = Value{std::vector<Value>()}; + + static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) { + return std::make_unique<WindowFunctionAddToSet>(expCtx); + } + + explicit WindowFunctionAddToSet(ExpressionContext* const expCtx) + : WindowFunctionState(expCtx), + _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {} + + void add(Value value) override { + _values.insert(std::move(value)); + } + + /** + * This should only remove the first/lowest element in the window. + */ + void remove(Value value) override { + auto iter = _values.find(std::move(value)); + tassert( + 5423800, "Can't remove from an empty WindowFunctionAddToSet", iter != _values.end()); + _values.erase(iter); + } + + void reset() override { + _values.clear(); + } + + Value getValue() const override { + std::vector<Value> output; + if (_values.empty()) + return kDefault; + for (auto it = _values.begin(); it != _values.end(); it = _values.upper_bound(*it)) { + output.push_back(*it); + } + + return Value(output); + } + +private: + ValueMultiset _values; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp b/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp index 6e4d8545240..d7a2e80dea9 100644 --- a/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_add_to_set_test.cpp @@ -31,7 +31,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/window_function/window_function.h" +#include "mongo/db/pipeline/window_function/window_function_add_to_set.h" #include "mongo/unittest/unittest.h" namespace mongo { diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp index 3f36d25123d..8844a70c1f9 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/window_function/window_function.h" #include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h" #include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h" +#include "mongo/db/pipeline/window_function/window_function_min_max.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp new file mode 100644 index 00000000000..a8d4769ed74 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp @@ -0,0 +1,283 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/partition_iterator.h" +#include "mongo/db/pipeline/window_function/window_bounds.h" +#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h" +#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h" +#include "mongo/db/pipeline/window_function/window_function_min_max.h" +#include "mongo/db/query/collation/collator_interface_mock.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class WindowFunctionExecRemovableDocumentTest : public AggregationContextFixture { +public: + WindowFunctionExecRemovableDocumentTest() {} + WindowFunctionExecRemovableDocument createForFieldPath( + std::deque<DocumentSource::GetNextResult> docs, + const std::string& inputPath, + WindowBounds::DocumentBased bounds) { + _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); + _iter = + std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none); + auto input = ExpressionFieldPath::parse( + getExpCtx().get(), inputPath, getExpCtx()->variablesParseState); + std::unique_ptr<WindowFunctionState> maxFunc = + std::make_unique<WindowFunctionMax>(getExpCtx().get()); + return WindowFunctionExecRemovableDocument( + _iter.get(), std::move(input), std::move(maxFunc), bounds); + } + + auto advanceIterator() { + return _iter->advance(); + } + +private: + boost::intrusive_ptr<DocumentSourceMock> _docSource; + std::unique_ptr<PartitionIterator> _iter; +}; + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToInteger) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath(docs, "$a", WindowBounds::DocumentBased{0, 2}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + + mgr = createForFieldPath( + std::move(docs), "$a", WindowBounds::DocumentBased{WindowBounds::Current{}, 2}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToCurrent) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath( + std::move(docs), "$a", WindowBounds::DocumentBased{-1, WindowBounds::Current{}}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToCurrent) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath( + std::move(docs), + "$a", + WindowBounds::DocumentBased{WindowBounds::Current{}, WindowBounds::Current{}}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToInteger) { + const auto docsOne = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{-1, 1}); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + + const auto docsTwo = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}}; + mgr = createForFieldPath(docsTwo, "$a", WindowBounds::DocumentBased{-3, -1}); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + + const auto docsThree = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 5}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}}; + mgr = createForFieldPath(std::move(docsTwo), "$a", WindowBounds::DocumentBased{1, 3}); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, DefaultValueWorksAsExpected) { + const auto docsOne = + std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}}, Document{{"a", 4}}}; + auto mgr = createForFieldPath(docsOne, "$a", WindowBounds::DocumentBased{-3, -2}); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value + mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{2, 3}); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, RightUnboundedDoesNotAddDocumentsDuringWindow) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}, Document{{"a", 17}}}; + auto mgr = createForFieldPath( + std::move(docs), + "$a", + WindowBounds::DocumentBased{WindowBounds::Current{}, WindowBounds::Unbounded{}}); + ASSERT_VALUE_EQ(Value(17), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(17), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(17), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(17), mgr.getNext()); + advanceIterator(); + + const auto docsTwo = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 18}}, Document{{"a", 2}}, Document{{"a", 1}}, Document{{"a", 17}}}; + mgr = createForFieldPath( + std::move(docsTwo), "$a", WindowBounds::DocumentBased{-1, WindowBounds::Unbounded{}}); + ASSERT_VALUE_EQ(Value(18), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(18), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(17), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(17), mgr.getNext()); + advanceIterator(); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureFirstDocumentIsNotRemovedEarly) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 4}}, Document{{"a", 1}}, Document{{"a", 2}}}; + auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{0, 2}); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureEarlyDocumentsAreNotIncludedIncorrectly) { + const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 6}}, + Document{{"a", 7}}, + Document{{"a", 2}}, + Document{{"a", 1}}, + Document{{"a", 0}}}; + auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{2, 5}); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(0), mgr.getNext()); + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) { + const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}}, + Document{{"a", 2}, {"key", 1}}, + Document{{"a", 2}, {"key", 2}}, + Document{{"a", 1}, {"key", 2}}, + Document{{"a", 1}, {"key", 3}}}; + auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); + auto key = ExpressionFieldPath::createPathFromString( + getExpCtx().get(), "key", getExpCtx()->variablesParseState); + auto iter = PartitionIterator{ + getExpCtx().get(), mock.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)}; + auto input = + ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); + CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString; + std::unique_ptr<WindowFunctionState> maxFunc = + std::make_unique<WindowFunctionMax>(getExpCtx().get()); + auto mgr = WindowFunctionExecRemovableDocument( + &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{0, 0}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + iter.advance(); + mgr.reset(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + iter.advance(); + mgr.reset(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + + const auto docsTwo = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}}, + Document{{"a", 2}, {"key", 1}}, + Document{{"a", 2}, {"key", 2}}, + Document{{"a", 1}, {"key", 2}}}; + auto mockTwo = DocumentSourceMock::createForTest(std::move(docsTwo), getExpCtx()); + auto keyTwo = ExpressionFieldPath::createPathFromString( + getExpCtx().get(), "key", getExpCtx()->variablesParseState); + iter = PartitionIterator{ + getExpCtx().get(), mockTwo.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)}; + input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); + maxFunc = std::make_unique<WindowFunctionMax>(getExpCtx().get()); + mgr = WindowFunctionExecRemovableDocument( + &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{-1, 0}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + iter.advance(); + mgr.reset(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_min_max.h b/src/mongo/db/pipeline/window_function/window_function_min_max.h new file mode 100644 index 00000000000..97f0f1792c0 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_min_max.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/window_function/window_function.h" + +namespace mongo { + +template <AccumulatorMinMax::Sense sense> +class WindowFunctionMinMax : public WindowFunctionState { +public: + static inline const Value kDefault = Value{BSONNULL}; + + static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) { + return std::make_unique<WindowFunctionMinMax<sense>>(expCtx); + } + + explicit WindowFunctionMinMax(ExpressionContext* const expCtx) + : WindowFunctionState(expCtx), + _values(_expCtx->getValueComparator().makeOrderedValueMultiset()) {} + + void add(Value value) final { + _values.insert(std::move(value)); + } + + void remove(Value value) final { + // std::multiset::insert is guaranteed to put the element after any equal elements + // already in the container. So find() / erase() will remove the oldest equal element, + // which is what we want, to satisfy "remove() undoes add() when called in FIFO order". + auto iter = _values.find(std::move(value)); + tassert(5371400, "Can't remove from an empty WindowFunctionMinMax", iter != _values.end()); + _values.erase(iter); + } + + void reset() final { + _values.clear(); + } + + Value getValue() const final { + if (_values.empty()) + return kDefault; + switch (sense) { + case AccumulatorMinMax::Sense::kMin: + return *_values.begin(); + case AccumulatorMinMax::Sense::kMax: + return *_values.rbegin(); + } + MONGO_UNREACHABLE_TASSERT(5371401); + } + +protected: + // Holds all the values in the window, in order, with constant-time access to both ends. + ValueMultiset _values; +}; +using WindowFunctionMin = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMin>; +using WindowFunctionMax = WindowFunctionMinMax<AccumulatorMinMax::Sense::kMax>; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp b/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp index b92b42ecd0f..62102ef20c0 100644 --- a/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_min_max_test.cpp @@ -31,7 +31,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/window_function/window_function.h" +#include "mongo/db/pipeline/window_function/window_function_min_max.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/window_function/window_function_push.h b/src/mongo/db/pipeline/window_function/window_function_push.h new file mode 100644 index 00000000000..80b3dae49c9 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_push.h @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/window_function/window_function.h" + +namespace mongo { + +class WindowFunctionPush final : public WindowFunctionState { +public: + using ValueListConstIterator = std::list<Value>::const_iterator; + + static inline const Value kDefault = Value{std::vector<Value>()}; + + static std::unique_ptr<WindowFunctionState> create(ExpressionContext* const expCtx) { + return std::make_unique<WindowFunctionPush>(expCtx); + } + + explicit WindowFunctionPush(ExpressionContext* const expCtx) + : WindowFunctionState(expCtx), + _values( + _expCtx->getValueComparator().makeOrderedValueMultimap<ValueListConstIterator>()) {} + + void add(Value value) override { + _list.emplace_back(std::move(value)); + auto iter = std::prev(_list.end()); + _values.insert({*iter, iter}); + } + + /** + * This should only remove the first/lowest element in the window. + */ + void remove(Value value) override { + // The order of the key-value pairs whose keys compare equivalent is the order of insertion + // and does not change in std::multimap. So find() / erase() will remove the oldest equal + // element, which is what we want, to satisfy "remove() undoes add() when called in FIFO + // order". + auto iter = _values.find(std::move(value)); + tassert(5423801, "Can't remove from an empty WindowFunctionPush", iter != _values.end()); + // Erase the element from both '_values' and '_list'. + _list.erase(iter->second); + _values.erase(iter); + } + + void reset() override { + _values.clear(); + _list.clear(); + } + + Value getValue() const override { + std::vector<Value> output; + if (_values.empty()) + return kDefault; + + return Value{std::vector<Value>(_list.begin(), _list.end())}; + } + +private: + ValueMultimap<ValueListConstIterator> _values; + // std::list makes sure that the order of the elements in the returned array is the order of + // insertion. + std::list<Value> _list; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_push_test.cpp b/src/mongo/db/pipeline/window_function/window_function_push_test.cpp index e96e62c0f26..a7790ee22e6 100644 --- a/src/mongo/db/pipeline/window_function/window_function_push_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_push_test.cpp @@ -31,7 +31,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/window_function/window_function.h" +#include "mongo/db/pipeline/window_function/window_function_push.h" #include "mongo/unittest/unittest.h" namespace mongo { diff --git a/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp b/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp index 42ccb284380..dbb270a4387 100644 --- a/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_std_dev_test.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/exec/document_value/document_value_test_util.h" -#include "mongo/db/pipeline/window_function/window_function.h" +#include "mongo/db/pipeline/window_function/window_function_stddev.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/window_function/window_function_stddev.h b/src/mongo/db/pipeline/window_function/window_function_stddev.h new file mode 100644 index 00000000000..f02e1353dd8 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_stddev.h @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/window_function/window_function.h" + +namespace mongo { + +class WindowFunctionStdDev : public WindowFunctionState { +protected: + explicit WindowFunctionStdDev(ExpressionContext* const expCtx, bool isSamp) + : WindowFunctionState(expCtx), + _sum(AccumulatorSum::create(expCtx)), + _m2(AccumulatorSum::create(expCtx)), + _isSamp(isSamp), + _count(0), + _nonfiniteValueCount(0) {} + +public: + static Value getDefault() { + return Value(BSONNULL); + } + + void add(Value value) { + update(std::move(value), +1); + } + + void remove(Value value) { + update(std::move(value), -1); + } + + Value getValue() const final { + if (_nonfiniteValueCount > 0) + return Value(std::numeric_limits<double>::quiet_NaN()); + const long long adjustedCount = _isSamp ? _count - 1 : _count; + if (adjustedCount == 0) + return getDefault(); + return Value(sqrt(_m2->getValue(false).coerceToDouble() / adjustedCount)); + } + + void reset() { + _m2->reset(); + _sum->reset(); + _count = 0; + _nonfiniteValueCount = 0; + } + +private: + void update(Value value, int quantity) { + // quantity should be 1 if adding value, -1 if removing value + if (!value.numeric()) + return; + if ((value.getType() == NumberDouble && !std::isfinite(value.getDouble())) || + (value.getType() == NumberDecimal && !value.getDecimal().isFinite())) { + _nonfiniteValueCount += quantity; + _count += quantity; + return; + } + + if (_count == 0) { // Assuming we are adding value if _count == 0. + _count++; + _sum->process(value, false); + return; + } else if (_count + quantity == 0) { // Empty the window. + reset(); + return; + } + double x = _count * value.coerceToDouble() - _sum->getValue(false).coerceToDouble(); + _count += quantity; + _sum->process(Value{value.coerceToDouble() * quantity}, false); + _m2->process(Value{x * x * quantity / (_count * (_count - quantity))}, false); + } + + // Std dev cannot make use of RemovableSum because of its specific handling of non-finite + // values. Adding a NaN or +/-inf makes the result NaN. Additionally, the consistent and + // exclusive use of doubles in std dev calculations makes the type handling in RemovableSum + // unnecessary. + boost::intrusive_ptr<AccumulatorState> _sum; + boost::intrusive_ptr<AccumulatorState> _m2; + bool _isSamp; + long long _count; + int _nonfiniteValueCount; +}; + +class WindowFunctionStdDevPop final : public WindowFunctionStdDev { +public: + explicit WindowFunctionStdDevPop(ExpressionContext* const expCtx) + : WindowFunctionStdDev(expCtx, false) {} +}; + +class WindowFunctionStdDevSamp final : public WindowFunctionStdDev { +public: + explicit WindowFunctionStdDevSamp(ExpressionContext* const expCtx) + : WindowFunctionStdDev(expCtx, true) {} +}; +} // namespace mongo |