summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2021-02-22 09:18:45 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-22 14:46:33 +0000
commitef416583d74e5ed6adb7c43d0a1cae786fd90bee (patch)
tree9b8acc650e1c3f3d245c7162050c92fe6ba5be33 /src/mongo/db
parent7ac3e9f1eb151aced254814767c965ea468a7f74 (diff)
downloadmongo-ef416583d74e5ed6adb7c43d0a1cae786fd90bee.tar.gz
SERVER-53398 Add removable window function executor
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.h7
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator_test.cpp22
-rw-r--r--src/mongo/db/pipeline/window_function/window_function.h6
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec.h47
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp96
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h81
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_test.cpp217
8 files changed, 477 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index fde2cb61f00..310479ecb3b 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -262,6 +262,7 @@ pipelineEnv.Library(
'skip_and_limit.cpp',
'tee_buffer.cpp',
'window_function/partition_iterator.cpp',
+ 'window_function/window_function_exec_removable_document.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver_minimal',
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h
index 0a0711ac3bf..bf7f0bd1097 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.h
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.h
@@ -68,6 +68,13 @@ public:
*/
AdvanceResult advance();
+ /**
+ * Returns the offset of the iterator for the current partition.
+ */
+ auto getCurrentOffset() const {
+ return _currentIndex;
+ }
+
private:
/**
* Retrieves the next document from the prior stage and updates the state accordingly.
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
index 97787282b10..f0666f12f39 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
+++ b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
@@ -177,5 +177,27 @@ TEST_F(PartitionIteratorTest, PartitionByArrayErrs) {
ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *partIter[0]);
ASSERT_THROWS_CODE(*partIter[1], AssertionException, ErrorCodes::TypeMismatch);
}
+
+TEST_F(PartitionIteratorTest, CurrentOffsetIsCorrectAfterDocumentsAreAccessed) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"key", 1}}, Document{{"key", 2}}, Document{{"key", 3}}, Document{{"key", 4}}};
+ const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
+ auto key = ExpressionFieldPath::createPathFromString(
+ getExpCtx().get(), "a", getExpCtx()->variablesParseState);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), *key);
+ ASSERT_EQ(0, partIter.getCurrentOffset());
+ auto doc = partIter[0];
+ partIter.advance();
+ ASSERT_EQ(1, partIter.getCurrentOffset());
+ doc = partIter[0];
+ partIter.advance();
+ ASSERT_EQ(2, partIter.getCurrentOffset());
+ doc = partIter[0];
+ partIter.advance();
+ ASSERT_EQ(3, partIter.getCurrentOffset());
+ doc = partIter[0];
+ ASSERT_EQ(3, partIter.getCurrentOffset());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function.h b/src/mongo/db/pipeline/window_function/window_function.h
index fedfbce1f4a..eb38616c95d 100644
--- a/src/mongo/db/pipeline/window_function/window_function.h
+++ b/src/mongo/db/pipeline/window_function/window_function.h
@@ -45,9 +45,11 @@ namespace mongo {
*/
class WindowFunctionState {
public:
+ virtual ~WindowFunctionState() = default;
virtual void add(Value) = 0;
virtual void remove(Value) = 0;
virtual Value getValue() const = 0;
+ virtual void reset() = 0;
};
@@ -77,6 +79,10 @@ public:
_values.erase(iter);
}
+ void reset() final {
+ _values.clear();
+ }
+
Value getValue() const final {
if (_values.empty())
return getDefault();
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.h b/src/mongo/db/pipeline/window_function/window_function_exec.h
index 9a8cce9bb7d..4dfcbdd8356 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec.h
@@ -29,10 +29,13 @@
#pragma once
+#include <queue>
+
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/window_function/partition_iterator.h"
#include "mongo/db/pipeline/window_function/window_bounds.h"
+#include "mongo/db/pipeline/window_function/window_function.h"
namespace mongo {
@@ -62,4 +65,48 @@ protected:
PartitionIterator* _iter;
};
+/**
+ * Base class for executors that need to remove documents from their held functions. The
+ * 'WindowFunctionState' parameter must expose an 'add()' and corresponding
+ * 'getValue()' method to get the accumulation result. It must also expose a 'remove()' method to
+ * remove a specific document from the calculation.
+ */
+class WindowFunctionExecRemovable : public WindowFunctionExec {
+public:
+ WindowFunctionExecRemovable(PartitionIterator* iter,
+ boost::intrusive_ptr<Expression> input,
+ std::unique_ptr<WindowFunctionState> function)
+ : WindowFunctionExec(iter), _input(std::move(input)), _function(std::move(function)) {}
+
+ Value getNext() {
+ if (!_initialized) {
+ this->initialize();
+ _initialized = true;
+ return _function->getValue();
+ }
+ processDocumentsToUpperBound();
+ removeDocumentsUnderLowerBound();
+ return _function->getValue();
+ }
+
+protected:
+ boost::intrusive_ptr<Expression> _input;
+ std::unique_ptr<WindowFunctionState> _function;
+ // Keep track of values in the window function that will need to be removed later.
+ std::queue<Value> _values;
+ // In one of two states: either the initial window has not been populated or we are sliding and
+ // accumulating/removing values.
+ bool _initialized = false;
+
+ void addValue(Value v) {
+ _function->add(v);
+ _values.push(v);
+ }
+
+private:
+ virtual void processDocumentsToUpperBound() = 0;
+ virtual void removeDocumentsUnderLowerBound() = 0;
+ virtual void initialize() = 0;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
new file mode 100644
index 00000000000..571c06eac30
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h"
+
+namespace mongo {
+
+WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument(
+ PartitionIterator* iter,
+ boost::intrusive_ptr<Expression> input,
+ std::unique_ptr<WindowFunctionState> function,
+ WindowBounds::DocumentBased bounds)
+ : WindowFunctionExecRemovable(iter, std::move(input), std::move(function)) {
+ stdx::visit(
+ visit_helper::Overloaded{
+ [](const WindowBounds::Unbounded&) {
+ // If the window is left unbounded we should use the non-removable executor.
+ MONGO_UNREACHABLE_TASSERT(5339802);
+ },
+ [&](const WindowBounds::Current&) { _lowerBound = 0; },
+ [&](const int& lowerIndex) { _lowerBound = lowerIndex; },
+ },
+ bounds.lower);
+
+ stdx::visit(
+ visit_helper::Overloaded{
+ [](const WindowBounds::Unbounded&) {
+ uasserted(5339801, "Right unbounded windows are not yet supported");
+ },
+ [&](const WindowBounds::Current&) { _upperBound = 0; },
+ [&](const int& upperIndex) { _upperBound = upperIndex; },
+ },
+ bounds.upper);
+}
+
+void WindowFunctionExecRemovableDocument::initialize() {
+ int lowerBoundForInit = _lowerBound > 0 ? _lowerBound : 0;
+ for (int i = lowerBoundForInit; i <= _upperBound; ++i) {
+ // If this is false, we're over the end of the partition.
+ if (auto doc = (*this->_iter)[i]) {
+ addValue(_input->evaluate(*doc, nullptr));
+ } else {
+ break;
+ }
+ }
+}
+
+void WindowFunctionExecRemovableDocument::processDocumentsToUpperBound() {
+ // If there is no upper bound, the whole partition is loaded by initialize.
+ if (_upperBound) {
+ // If this is false, we're over the end of the partition.
+ if (auto doc = (*this->_iter)[_upperBound.get()]) {
+ addValue(_input->evaluate(*doc, nullptr));
+ }
+ }
+}
+
+void WindowFunctionExecRemovableDocument::removeDocumentsUnderLowerBound() {
+ // For a positive lower bound the first pass loads the correct window, so subsequent passes
+ // must always remove a document if there is a document left to remove.
+ // For a negative lower bound we can start removing every time only after we have seen
+ // documents to fill the left side of the window.
+ if (_lowerBound >= 0 || _iter->getCurrentOffset() > abs(_lowerBound)) {
+ removeFirstValueIfExists();
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
new file mode 100644
index 00000000000..2637a23a925
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/partition_iterator.h"
+#include "mongo/db/pipeline/window_function/window_bounds.h"
+#include "mongo/db/pipeline/window_function/window_function_exec.h"
+
+namespace mongo {
+
+/**
+ * An executor that specifically handles document-based window types which accumulate values while
+ * removing old ones.
+ */
+class WindowFunctionExecRemovableDocument final : public WindowFunctionExecRemovable {
+public:
+ /**
+ * Constructs a removable window function executor with the given input expression to be
+ * evaluated and passed to the corresponding WindowFunc for each document in the window.
+ *
+ * The 'bounds' parameter is the user supplied bounds for the window.
+ */
+ WindowFunctionExecRemovableDocument(PartitionIterator* iter,
+ boost::intrusive_ptr<Expression> input,
+ std::unique_ptr<WindowFunctionState> function,
+ WindowBounds::DocumentBased bounds);
+
+ void reset() final {
+ _function->reset();
+ _initialized = false;
+ }
+
+private:
+ void processDocumentsToUpperBound() final;
+
+ void removeDocumentsUnderLowerBound() final;
+
+ void initialize() final;
+
+ void removeFirstValueIfExists() {
+ if (_values.size() == 0) {
+ return;
+ }
+ _function->remove(_values.front());
+ _values.pop();
+ }
+
+ int _lowerBound;
+ // Will stay boost::none if right unbounded.
+ boost::optional<int> _upperBound = boost::none;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp
index 3d4e7234b01..bdc773ee131 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_test.cpp
@@ -37,7 +37,10 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/window_function/partition_iterator.h"
#include "mongo/db/pipeline/window_function/window_bounds.h"
+#include "mongo/db/pipeline/window_function/window_function.h"
#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h"
+#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h"
+#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -68,6 +71,36 @@ private:
std::unique_ptr<PartitionIterator> _iter;
};
+class WindowFunctionExecRemovableDocumentTest : public AggregationContextFixture {
+public:
+ WindowFunctionExecRemovableDocumentTest()
+ : collator(CollatorInterfaceMock::MockType::kToLowerString), cmp(&collator) {}
+
+ WindowFunctionExecRemovableDocument createForFieldPath(
+ std::deque<DocumentSource::GetNextResult> docs,
+ const std::string& inputPath,
+ WindowBounds::DocumentBased bounds) {
+ _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
+ _iter =
+ std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none);
+ auto input = ExpressionFieldPath::parse(
+ getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
+ std::unique_ptr<WindowFunctionState> maxFunc = std::make_unique<WindowFunctionMax>(cmp);
+ return WindowFunctionExecRemovableDocument(
+ _iter.get(), std::move(input), std::move(maxFunc), bounds);
+ }
+
+ auto advanceIterator() {
+ return _iter->advance();
+ }
+
+private:
+ CollatorInterfaceMock collator;
+ ValueComparator cmp;
+ boost::intrusive_ptr<DocumentSourceMock> _docSource;
+ std::unique_ptr<PartitionIterator> _iter;
+};
+
TEST_F(WindowFunctionExecNonRemovableTest, AccumulateOnlyWithoutLookahead) {
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
@@ -153,5 +186,189 @@ TEST_F(WindowFunctionExecNonRemovableTest, UnboundedNotYetSupported) {
5374100);
}
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToInteger) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{0, 2});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+
+ mgr = createForFieldPath(
+ std::move(docs), "$a", WindowBounds::DocumentBased{WindowBounds::Current{}, 2});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToCurrent) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(
+ std::move(docs), "$a", WindowBounds::DocumentBased{-1, WindowBounds::Current{}});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateCurrentToCurrent) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(
+ std::move(docs),
+ "$a",
+ WindowBounds::DocumentBased{WindowBounds::Current{}, WindowBounds::Current{}});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, AccumulateIntegerToInteger) {
+ const auto docsOne = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ auto mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{-1, 1});
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+
+ const auto docsTwo = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ mgr = createForFieldPath(std::move(docsTwo), "$a", WindowBounds::DocumentBased{-3, -1});
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+
+ const auto docsThree = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 5}}, Document{{"a", 4}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ mgr = createForFieldPath(std::move(docsTwo), "$a", WindowBounds::DocumentBased{1, 3});
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, DefaultValueWorksAsExpected) {
+ const auto docsOne =
+ std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}}, Document{{"a", 4}}};
+ auto mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{-3, -2});
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+ mgr = createForFieldPath(std::move(docsOne), "$a", WindowBounds::DocumentBased{2, 3});
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, FunctionalityNotSupported) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 3}}, Document{{"a", 2}}, Document{{"a", 1}}};
+ ASSERT_THROWS_CODE(
+ createForFieldPath(
+ std::move(docs), "$a", WindowBounds::DocumentBased{1, WindowBounds::Unbounded{}}),
+ AssertionException,
+ 5339801);
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureFirstDocumentIsNotRemovedEarly) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 4}}, Document{{"a", 1}}, Document{{"a", 2}}};
+ auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{0, 2});
+ ASSERT_VALUE_EQ(Value(4), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, EnsureEarlyDocumentsAreNotIncludedIncorrectly) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 6}},
+ Document{{"a", 7}},
+ Document{{"a", 2}},
+ Document{{"a", 1}},
+ Document{{"a", 0}}};
+ auto mgr = createForFieldPath(std::move(docs), "$a", WindowBounds::DocumentBased{2, 5});
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+ advanceIterator();
+ ASSERT_VALUE_EQ(Value(0), mgr.getNext());
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+ ASSERT(mgr.getNext().nullish()); // Default value
+ advanceIterator();
+}
+
+TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}},
+ Document{{"a", 2}, {"key", 1}},
+ Document{{"a", 2}, {"key", 2}},
+ Document{{"a", 1}, {"key", 2}},
+ Document{{"a", 1}, {"key", 3}}};
+ auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
+ auto key = ExpressionFieldPath::createPathFromString(
+ getExpCtx().get(), "key", getExpCtx()->variablesParseState);
+ auto iter = PartitionIterator{getExpCtx().get(), mock.get(), *key};
+ auto input =
+ ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
+ CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString;
+ ValueComparator cmp(&collator);
+ std::unique_ptr<WindowFunctionState> maxFunc = std::make_unique<WindowFunctionMax>(cmp);
+ auto mgr = WindowFunctionExecRemovableDocument(
+ &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{0, 0});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ iter.advance();
+ mgr.reset();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+ iter.advance();
+ mgr.reset();
+ ASSERT_VALUE_EQ(Value(1), mgr.getNext());
+
+ const auto docsTwo = std::deque<DocumentSource::GetNextResult>{Document{{"a", 3}, {"key", 1}},
+ Document{{"a", 2}, {"key", 1}},
+ Document{{"a", 2}, {"key", 2}},
+ Document{{"a", 1}, {"key", 2}}};
+ auto mockTwo = DocumentSourceMock::createForTest(std::move(docsTwo), getExpCtx());
+ auto keyTwo = ExpressionFieldPath::createPathFromString(
+ getExpCtx().get(), "key", getExpCtx()->variablesParseState);
+ iter = PartitionIterator{getExpCtx().get(), mockTwo.get(), *key};
+ input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
+ maxFunc = std::make_unique<WindowFunctionMax>(cmp);
+ mgr = WindowFunctionExecRemovableDocument(
+ &iter, std::move(input), std::move(maxFunc), WindowBounds::DocumentBased{-1, 0});
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(3), mgr.getNext());
+ iter.advance();
+ mgr.reset();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+ iter.advance();
+ ASSERT_VALUE_EQ(Value(2), mgr.getNext());
+}
+
} // namespace
} // namespace mongo