diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2021-02-22 09:18:45 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-22 14:46:33 +0000 |
commit | ef416583d74e5ed6adb7c43d0a1cae786fd90bee (patch) | |
tree | 9b8acc650e1c3f3d245c7162050c92fe6ba5be33 /src/mongo | |
parent | 7ac3e9f1eb151aced254814767c965ea468a7f74 (diff) | |
download | mongo-ef416583d74e5ed6adb7c43d0a1cae786fd90bee.tar.gz |
SERVER-53398 Add removable window function executor
Diffstat (limited to 'src/mongo')
8 files changed, 477 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index fde2cb61f00..310479ecb3b 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -262,6 +262,7 @@ pipelineEnv.Library( 'skip_and_limit.cpp', 'tee_buffer.cpp', 'window_function/partition_iterator.cpp', + 'window_function/window_function_exec_removable_document.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver_minimal', diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h index 0a0711ac3bf..bf7f0bd1097 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.h +++ b/src/mongo/db/pipeline/window_function/partition_iterator.h @@ -68,6 +68,13 @@ public: */ AdvanceResult advance(); + /** + * Returns the offset of the iterator for the current partition. + */ + auto getCurrentOffset() const { + return _currentIndex; + } + private: /** * Retrieves the next document from the prior stage and updates the state accordingly. diff --git a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp index 97787282b10..f0666f12f39 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp +++ b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp @@ -177,5 +177,27 @@ TEST_F(PartitionIteratorTest, PartitionByArrayErrs) { ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *partIter[0]); ASSERT_THROWS_CODE(*partIter[1], AssertionException, ErrorCodes::TypeMismatch); } + +TEST_F(PartitionIteratorTest, CurrentOffsetIsCorrectAfterDocumentsAreAccessed) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"key", 1}}, Document{{"key", 2}}, Document{{"key", 3}}, Document{{"key", 4}}}; + const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); + auto key = ExpressionFieldPath::createPathFromString( + getExpCtx().get(), "a", getExpCtx()->variablesParseState); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), *key); + ASSERT_EQ(0, partIter.getCurrentOffset()); + auto doc = partIter[0]; + partIter.advance(); + ASSERT_EQ(1, partIter.getCurrentOffset()); + doc = partIter[0]; + partIter.advance(); + ASSERT_EQ(2, partIter.getCurrentOffset()); + doc = partIter[0]; + partIter.advance(); + ASSERT_EQ(3, partIter.getCurrentOffset()); + doc = partIter[0]; + ASSERT_EQ(3, partIter.getCurrentOffset()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function.h b/src/mongo/db/pipeline/window_function/window_function.h index fedfbce1f4a..eb38616c95d 100644 --- a/src/mongo/db/pipeline/window_function/window_function.h +++ b/src/mongo/db/pipeline/window_function/window_function.h @@ -45,9 +45,11 @@ namespace mongo { */ class WindowFunctionState { public: + virtual ~WindowFunctionState() = default; virtual void add(Value) = 0; virtual void remove(Value) = 0; virtual Value getValue() const = 0; + virtual void reset() = 0; }; @@ -77,6 +79,10 @@ public: _values.erase(iter); } + void reset() final { + _values.clear(); + } + Value getValue() const final { if (_values.empty()) return getDefault(); diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.h b/src/mongo/db/pipeline/window_function/window_function_exec.h index 9a8cce9bb7d..4dfcbdd8356 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec.h @@ -29,10 +29,13 @@ #pragma once +#include <queue> + #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/window_function/partition_iterator.h" #include "mongo/db/pipeline/window_function/window_bounds.h" +#include "mongo/db/pipeline/window_function/window_function.h" namespace mongo { @@ -62,4 +65,48 @@ protected: PartitionIterator* _iter; }; +/** + * Base class for executors that need to remove documents from their held functions. The + * 'WindowFunctionState' parameter must expose an 'add()' and corresponding + * 'getValue()' method to get the accumulation result. It must also expose a 'remove()' method to + * remove a specific document from the calculation. + */ +class WindowFunctionExecRemovable : public WindowFunctionExec { +public: + WindowFunctionExecRemovable(PartitionIterator* iter, + boost::intrusive_ptr<Expression> input, + std::unique_ptr<WindowFunctionState> function) + : WindowFunctionExec(iter), _input(std::move(input)), _function(std::move(function)) {} + + Value getNext() { + if (!_initialized) { + this->initialize(); + _initialized = true; + return _function->getValue(); + } + processDocumentsToUpperBound(); + removeDocumentsUnderLowerBound(); + return _function->getValue(); + } + +protected: + boost::intrusive_ptr<Expression> _input; + std::unique_ptr<WindowFunctionState> _function; + // Keep track of values in the window function that will need to be removed later. + std::queue<Value> _values; + // In one of two states: either the initial window has not been populated or we are sliding and + // accumulating/removing values. + bool _initialized = false; + + void addValue(Value v) { + _function->add(v); + _values.push(v); + } + +private: + virtual void processDocumentsToUpperBound() = 0; + virtual void removeDocumentsUnderLowerBound() = 0; + virtual void initialize() = 0; +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp new file mode 100644 index 00000000000..571c06eac30 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h" + +namespace mongo { + +WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument( + PartitionIterator* iter, + boost::intrusive_ptr<Expression> input, + std::unique_ptr<WindowFunctionState> function, + WindowBounds::DocumentBased bounds) + : WindowFunctionExecRemovable(iter, std::move(input), std::move(function)) { + stdx::visit( + visit_helper::Overloaded{ + [](const WindowBounds::Unbounded&) { + // If the window is left unbounded we should use the non-removable executor. + MONGO_UNREACHABLE_TASSERT(5339802); + }, + [&](const WindowBounds::Current&) { _lowerBound = 0; }, + [&](const int& lowerIndex) { _lowerBound = lowerIndex; }, + }, + bounds.lower); + + stdx::visit( + visit_helper::Overloaded{ + [](const WindowBounds::Unbounded&) { + uasserted(5339801, "Right unbounded windows are not yet supported"); + }, + [&](const WindowBounds::Current&) { _upperBound = 0; }, + [&](const int& upperIndex) { _upperBound = upperIndex; }, + }, + bounds.upper); +} + +void WindowFunctionExecRemovableDocument::initialize() { + int lowerBoundForInit = _lowerBound > 0 ? _lowerBound : 0; + for (int i = lowerBoundForInit; i <= _upperBound; ++i) { + // If this is false, we're over the end of the partition. + if (auto doc = (*this->_iter)[i]) { + addValue(_input->evaluate(*doc, nullptr)); + } else { + break; + } + } +} + +void WindowFunctionExecRemovableDocument::processDocumentsToUpperBound() { + // If there is no upper bound, the whole partition is loaded by initialize. + if (_upperBound) { + // If this is false, we're over the end of the partition. + if (auto doc = (*this->_iter)[_upperBound.get()]) { + addValue(_input->evaluate(*doc, nullptr)); + } + } +} + +void WindowFunctionExecRemovableDocument::removeDocumentsUnderLowerBound() { + // For a positive lower bound the first pass loads the correct window, so subsequent passes + // must always remove a document if there is a document left to remove. + // For a negative lower bound we can start removing every time only after we have seen + // documents to fill the left side of the window. + if (_lowerBound >= 0 || _iter->getCurrentOffset() > abs(_lowerBound)) { + removeFirstValueIfExists(); + } +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h new file mode 100644 index 00000000000..2637a23a925 --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/partition_iterator.h" +#include "mongo/db/pipeline/window_function/window_bounds.h" +#include "mongo/db/pipeline/window_function/window_function_exec.h" + +namespace mongo { + +/** + * An executor that specifically handles document-based window types which accumulate values while + * removing old ones. + */ +class WindowFunctionExecRemovableDocument final : public WindowFunctionExecRemovable { +public: + /** + * Constructs a removable window function executor with the given input expression to be + * evaluated and passed to the corresponding WindowFunc for each document in the window. + * + * The 'bounds' parameter is the user supplied bounds for the window. + */ + WindowFunctionExecRemovableDocument(PartitionIterator* iter, + boost::intrusive_ptr<Expression> input, + std::unique_ptr<WindowFunctionState> function, + WindowBounds::DocumentBased bounds); + + void reset() final { + _function->reset(); + _initialized = false; + } + +private: + void processDocumentsToUpperBound() final; + + void removeDocumentsUnderLowerBound() final; + + void initialize() final; + + void removeFirstValueIfExists() { + if (_values.size() == 0) { + return; + } + _function->remove(_values.front()); + _values.pop(); + } + + int _lowerBound; + // Will stay boost::none if right unbounded. + boost::optional<int> _upperBound = boost::none; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp index 3d4e7234b01..bdc773ee131 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp @@ -37,7 +37,10 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/window_function/partition_iterator.h" #include "mongo/db/pipeline/window_function/window_bounds.h" +#include "mongo/db/pipeline/window_function/window_function.h" #include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h" +#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h" +#include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -68,6 +71,36 @@ private: std::unique_ptr<PartitionIterator> _iter; }; +class WindowFunctionExecRemovableDocumentTest : public AggregationContextFixture { +public: + WindowFunctionExecRemovableDocumentTest() + : collator(CollatorInterfaceMock::MockType::kToLowerString), cmp(&collator) {} + + WindowFunctionExecRemovableDocument createForFieldPath( + std::deque<DocumentSource::GetNextResult> docs, + const std::string& inputPath, + WindowBounds::DocumentBased bounds) { + _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); + _iter = + std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none); + auto input = ExpressionFieldPath::parse( + getExpCtx().get(), inputPath, getExpCtx()->variablesParseState); + std::unique_ptr<WindowFunctionState> maxFunc = std::make_unique<WindowFunctionMax>(cmp); + return WindowFunctionExecRemovableDocument( + _iter.get(), std::move(input), std::move(maxFunc), bounds); + } + + auto advanceIterator() { + return _iter->advance(); + } + +private: + CollatorInterfaceMock collator; + ValueComparator cmp; + boost::intrusive_ptr<DocumentSourceMock> _docSource; + std::unique_ptr<PartitionIterator> _iter; +}; + TEST_F(WindowFunctionExecNonRemovableTest, AccumulateOnlyWithoutLookahead) { const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}}; @@ -153,5 +186,189 @@ TEST_F(WindowFunctionExecNonRemovableTest, UnboundedNotYetSupported) { 5374100); } +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToInteger) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{0, 2}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + + mgr = createForFieldPath( + std::move(docs), "$a", WindowBounds::DocumentBased{WindowBounds::Current{}, 2}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToCurrent) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath( + std::move(docs), "$a", WindowBounds::DocumentBased{-1, WindowBounds::Current{}}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToCurrent) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath( + std::move(docs), + "$a", + WindowBounds::DocumentBased{WindowBounds::Current{}, WindowBounds::Current{}}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToInteger) { + const auto docsOne = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}}; + auto mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{-1, 1}); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + + const auto docsTwo = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}}; + mgr = createForFieldPath(std::move(docsTwo), "$a", WindowBounds::DocumentBased{-3, -1}); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + + const auto docsThree = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 5}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}}; + mgr = createForFieldPath(std::move(docsTwo), "$a", WindowBounds::DocumentBased{1, 3}); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, DefaultValueWorksAsExpected) { + const auto docsOne = + std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}}, Document{{"a", 4}}}; + auto mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{-3, -2}); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value + mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{2, 3}); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, FunctionalityNotSupported) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}}; + ASSERT_THROWS_CODE( + createForFieldPath( + std::move(docs), "$a", WindowBounds::DocumentBased{1, WindowBounds::Unbounded{}}), + AssertionException, + 5339801); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureFirstDocumentIsNotRemovedEarly) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 4}}, Document{{"a", 1}}, Document{{"a", 2}}}; + auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{0, 2}); + ASSERT_VALUE_EQ(Value(4), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureEarlyDocumentsAreNotIncludedIncorrectly) { + const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 6}}, + Document{{"a", 7}}, + Document{{"a", 2}}, + Document{{"a", 1}}, + Document{{"a", 0}}}; + auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{2, 5}); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + advanceIterator(); + ASSERT_VALUE_EQ(Value(0), mgr.getNext()); + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); + ASSERT(mgr.getNext().nullish()); // Default value + advanceIterator(); +} + +TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) { + const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}}, + Document{{"a", 2}, {"key", 1}}, + Document{{"a", 2}, {"key", 2}}, + Document{{"a", 1}, {"key", 2}}, + Document{{"a", 1}, {"key", 3}}}; + auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); + auto key = ExpressionFieldPath::createPathFromString( + getExpCtx().get(), "key", getExpCtx()->variablesParseState); + auto iter = PartitionIterator{getExpCtx().get(), mock.get(), *key}; + auto input = + ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); + CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString; + ValueComparator cmp(&collator); + std::unique_ptr<WindowFunctionState> maxFunc = std::make_unique<WindowFunctionMax>(cmp); + auto mgr = WindowFunctionExecRemovableDocument( + &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{0, 0}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + iter.advance(); + mgr.reset(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + iter.advance(); + mgr.reset(); + ASSERT_VALUE_EQ(Value(1), mgr.getNext()); + + const auto docsTwo = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}}, + Document{{"a", 2}, {"key", 1}}, + Document{{"a", 2}, {"key", 2}}, + Document{{"a", 1}, {"key", 2}}}; + auto mockTwo = DocumentSourceMock::createForTest(std::move(docsTwo), getExpCtx()); + auto keyTwo = ExpressionFieldPath::createPathFromString( + getExpCtx().get(), "key", getExpCtx()->variablesParseState); + iter = PartitionIterator{getExpCtx().get(), mockTwo.get(), *key}; + input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); + maxFunc = std::make_unique<WindowFunctionMax>(cmp); + mgr = WindowFunctionExecRemovableDocument( + &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{-1, 0}); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(3), mgr.getNext()); + iter.advance(); + mgr.reset(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); + iter.advance(); + ASSERT_VALUE_EQ(Value(2), mgr.getNext()); +} + } // namespace } // namespace mongo |