summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/window_function
diff options
context:
space:
mode:
authorDavid Percy <david.percy@mongodb.com>2021-03-05 21:21:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-19 19:15:48 +0000
commitc3209ebb1db49656dcc3d1edf3635f7c7e96add5 (patch)
treeb3f920a3df53b171145371c734ba9185dc3e3bdc /src/mongo/db/pipeline/window_function
parent481bade9c40d662bf463291645bbf9fddce3a11f (diff)
downloadmongo-c3209ebb1db49656dcc3d1edf3635f7c7e96add5.tar.gz
SERVER-54294 Support range-based bounds for window functions
Diffstat (limited to 'src/mongo/db/pipeline/window_function')
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.cpp262
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator.h114
-rw-r--r--src/mongo/db/pipeline/window_function/partition_iterator_test.cpp62
-rw-r--r--src/mongo/db/pipeline/window_function/window_bounds.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_bounds.h2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec.cpp49
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec.h59
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_derivative.h2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp4
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_first_last.h2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp2
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h117
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp14
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp15
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h11
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp125
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h88
-rw-r--r--src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp17
18 files changed, 827 insertions, 120 deletions
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp
index dad88025ebd..32967618454 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp
@@ -32,9 +32,63 @@
#include "mongo/db/pipeline/window_function/partition_iterator.h"
#include "mongo/util/visit_helper.h"
+using boost::optional;
+
namespace mongo {
-boost::optional<Document> PartitionIterator::operator[](int index) {
+namespace {
+/**
+ * Create an Expression from a SortPattern, if the sort is simple enough.
+ *
+ * The sort must have one, ascending, non-expression field.
+ * The field may be dotted.
+ *
+ * For example: {ab.c: 1} becomes "$ab.c", but {a: -1} becomes boost::none.
+ */
+static optional<boost::intrusive_ptr<ExpressionFieldPath>> exprFromSort(
+ ExpressionContext* expCtx, const optional<SortPattern>& sortPattern) {
+ if (!sortPattern)
+ return boost::none;
+ if (sortPattern->size() != 1)
+ return boost::none;
+ const SortPattern::SortPatternPart& part = *sortPattern->begin();
+
+ bool hasFieldPath = part.fieldPath != boost::none;
+ bool hasExpression = part.expression != nullptr;
+ tassert(5429403,
+ "SortPatternPart is supposed to have exactly one: fieldPath, or expression.",
+ hasFieldPath != hasExpression);
+
+ if (hasExpression)
+ return boost::none;
+
+ // Descending sorts are not allowed with range-based bounds.
+ //
+ // We think this would be confusing.
+ // Does [x, y] mean [lower, upper] or [left, right] ?
+ //
+ // For example, suppose you sort by {time: -1} to put recent documents first.
+ // Would you write 'range: [-5, +2]', with the smaller value first?
+ // Or would you write 'range: [+2, -5]', with the more recent value first?
+ if (!part.isAscending)
+ return boost::none;
+
+ return ExpressionFieldPath::createPathFromString(
+ expCtx, part.fieldPath->fullPath(), expCtx->variablesParseState);
+}
+} // namespace
+
+PartitionIterator::PartitionIterator(ExpressionContext* expCtx,
+ DocumentSource* source,
+ optional<boost::intrusive_ptr<Expression>> partitionExpr,
+ const optional<SortPattern>& sortPattern)
+ : _expCtx(expCtx),
+ _source(source),
+ _partitionExpr(std::move(partitionExpr)),
+ _sortExpr(exprFromSort(_expCtx, sortPattern)),
+ _state(IteratorState::kNotInitialized) {}
+
+optional<Document> PartitionIterator::operator[](int index) {
auto desired = _currentCacheIndex + index;
if (_state == IteratorState::kAdvancedToEOF)
@@ -152,49 +206,209 @@ PartitionIterator::AdvanceResult PartitionIterator::advance() {
}
namespace {
-boost::optional<int> numericBound(WindowBounds::Bound<int> bound) {
+optional<int> numericBound(WindowBounds::Bound<int> bound) {
return stdx::visit(
visit_helper::Overloaded{
- [](WindowBounds::Unbounded) -> boost::optional<int> { return boost::none; },
- [](WindowBounds::Current) -> boost::optional<int> { return 0; },
- [](int i) -> boost::optional<int> { return i; },
+ [](WindowBounds::Unbounded) -> optional<int> { return boost::none; },
+ [](WindowBounds::Current) -> optional<int> { return 0; },
+ [](int i) -> optional<int> { return i; },
},
bound);
}
+
+// Assumes both arguments are numeric, and performs Decimal128 addition on them.
+Value decimalAdd(const Value& left, const Value& right) {
+ // Widening to Decimal128 is a convenient way to avoid having many cases for different numeric
+ // types. The 'threshold' values we compute are only used to choose a set of documents; the
+ // user can't observe the type.
+ return Value(left.coerceToDecimal().add(right.coerceToDecimal()));
+}
} // namespace
-boost::optional<std::pair<int, int>> PartitionIterator::getEndpoints(const WindowBounds& bounds) {
- // For range-based bounds, we will need to:
- // 1. extract the sortBy for (*this)[0]
- // 2. step backwards until we cross bounds.lower
- // 3. step forwards until we cross bounds.upper
- // This means we'll need to pass in sortBy somewhere.
- tassert(5423300,
- "TODO SERVER-54294: range-based and time-based bounds",
- stdx::holds_alternative<WindowBounds::DocumentBased>(bounds.bounds));
+optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased(
+ const WindowBounds& bounds, const optional<std::pair<int, int>>& hint) {
+ tassert(5429404, "Missing _sortExpr with range-based bounds", _sortExpr != boost::none);
+ // TODO SERVER-54295: time-based bounds
+ uassert(5429402,
+ "Time-based bounds not supported yet",
+ stdx::holds_alternative<WindowBounds::RangeBased>(bounds.bounds));
+ auto range = stdx::get<WindowBounds::RangeBased>(bounds.bounds);
+
+ auto lessThan = _expCtx->getValueComparator().getLessThan();
+
+ Value base = (*_sortExpr)->evaluate(*(*this)[0], &_expCtx->variables);
+ uassert(5429413,
+ "Invalid range: For windows that involve date or time ranges, a unit must be provided.",
+ base.getType() != BSONType::Date);
+ uassert(5429414,
+ str::stream() << "Invalid range: Expected the sortBy field to be a number, but it was "
+ << base.getType(),
+ base.numeric());
+
+ // 'lower' is the smallest offset in the partition that's within the lower bound of the window.
+ optional<int> lower = stdx::visit(
+ visit_helper::Overloaded{
+ [&](WindowBounds::Current) -> optional<int> {
+ // 'range: ["current", _]' means the current document, which is always offset 0.
+ return 0;
+ },
+ [&](WindowBounds::Unbounded) -> optional<int> {
+ // Find the leftmost document whose sortBy field evaluates to a numeric value.
+
+ // Start from the beginning, or the hint, whichever is higher.
+ // Note that the hint may no longer be a valid offset, if some documents were
+ // released from the cache.
+ int start = getMinCachedOffset();
+ if (hint) {
+ start = std::max(hint->first, start);
+ }
+
+ for (int i = start;; ++i) {
+ auto doc = (*this)[i];
+ if (!doc) {
+ return boost::none;
+ }
+ Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables);
+ if (v.numeric()) {
+ return i;
+ }
+ }
+ },
+ [&](const Value& delta) -> optional<int> {
+ Value threshold = decimalAdd(base, delta);
+
+ // Start from the beginning, or the hint, whichever is higher.
+ // Note that the hint may no longer be a valid offset, if some documents were
+ // released from the cache.
+ int start = getMinCachedOffset();
+ if (hint) {
+ start = std::max(hint->first, start);
+ }
+
+ boost::optional<Document> doc;
+ for (int i = start; (doc = (*this)[i]); ++i) {
+ Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables);
+ if (!lessThan(v, threshold)) {
+ // This is the first doc we've scanned that crossed the threshold.
+ return i;
+ }
+ }
+ // We scanned every document in the partition, and none crossed the
+ // threshold. So the window must be shifted so far to the right that no
+ // documents fall in it.
+ return boost::none;
+ },
+ },
+ range.lower);
+
+ if (!lower)
+ return boost::none;
+
+ // 'upper' is the largest offset in the partition that's within the upper bound of the window.
+ optional<int> upper = stdx::visit(
+ visit_helper::Overloaded{
+ [&](WindowBounds::Current) -> optional<int> {
+ // 'range: [_, "current"]' means the current document, which is offset 0.
+ return 0;
+ },
+ [&](WindowBounds::Unbounded) -> optional<int> {
+ // Find the rightmost document whose sortBy field evaluates to a numeric value.
+
+ // We know that the current document, the lower bound, and the hint (if present)
+ // are all numeric, so start scanning from whichever is highest.
+ int start = std::max(0, *lower);
+ if (hint) {
+ start = std::max(hint->second, start);
+ }
+
+ boost::optional<Document> doc;
+ for (int i = start; (doc = (*this)[i]); ++i) {
+ Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables);
+ if (!v.numeric()) {
+ // The previously scanned doc is the rightmost numeric one. Since we start
+ // from '0', 'hint', or 'lower', which are all numeric, we should never hit
+ // this case on the first iteration.
+ tassert(5429412,
+ "Failed to find the rightmost numeric document, "
+ "while computing window bounds",
+ i != start);
+ return i - 1;
+ }
+ }
+ return getMaxCachedOffset();
+ },
+ [&](const Value& delta) -> optional<int> {
+ // Pull in documents until the sortBy value crosses 'base + delta'.
+ tassert(5429406, "Range-based bounds are specified as a number", delta.numeric());
+ Value threshold = decimalAdd(base, delta);
+
+ // If there's no hint, start scanning from the lower bound.
+ // If there is a hint, start from whichever is greater: lower bound or hint.
+ // Usually the hint is greater, but with bounds like [0, 0] the new lower bound
+ // will be greater than the old upper bound.
+ int start = *lower;
+ if (hint) {
+ start = std::max(hint->second, start);
+ }
+
+ for (int i = start;; ++i) {
+ auto doc = (*this)[i];
+ if (!doc) {
+ // We scanned every document in the partition, and none crossed the upper
+ // bound. So the upper bound contains everything up to the end of the
+ // partition.
+ return getMaxCachedOffset();
+ }
+ Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables);
+ if (lessThan(threshold, v)) {
+ // This doc exceeded the upper bound.
+ // The previously scanned doc (if any) is the greatest in-bounds one.
+ if (i == start) {
+ // This case can happen, for example, at the beginning of a partition
+ // when the window is 'range: [-100, -5]'. There can be documents
+ // within the lower bound of -100, but none within the upper bound of
+ // -5.
+ return boost::none;
+ } else {
+ return i - 1;
+ }
+ }
+ }
+ },
+ },
+ range.upper);
+
+ if (!upper)
+ return boost::none;
+
+ return std::pair{*lower, *upper};
+}
+
+optional<std::pair<int, int>> PartitionIterator::getEndpointsDocumentBased(
+ const WindowBounds& bounds, const optional<std::pair<int, int>>& hint = boost::none) {
tassert(5423301, "getEndpoints assumes there is a current document", (*this)[0] != boost::none);
auto docBounds = stdx::get<WindowBounds::DocumentBased>(bounds.bounds);
- boost::optional<int> lowerBound = numericBound(docBounds.lower);
- boost::optional<int> upperBound = numericBound(docBounds.upper);
+ optional<int> lowerBound = numericBound(docBounds.lower);
+ optional<int> upperBound = numericBound(docBounds.upper);
tassert(5423302,
"Bounds should never be inverted",
!lowerBound || !upperBound || lowerBound <= upperBound);
// Pull documents into the cache until it contains the whole window.
+ // We want to know whether the window reaches the end of the partition.
if (upperBound) {
// For a right-bounded window we only need to pull in documents up to the bound.
(*this)[*upperBound];
} else {
// For a right-unbounded window we need to pull in the whole partition. operator[] reports
// end of partition by returning boost::none instead of a document.
- for (int i = 0; (*this)[i]; ++i) {
- }
+ cacheWholePartition();
}
// Valid offsets into the cache are any 'i' such that '_cache[_currentCacheIndex + i]' is valid.
// We know the cache is nonempty because it contains the current document.
- int cacheOffsetMin = -_currentCacheIndex;
- int cacheOffsetMax = cacheOffsetMin + _cache.size() - 1;
+ int cacheOffsetMin = getMinCachedOffset();
+ int cacheOffsetMax = getMaxCachedOffset();
// The window can only be empty if the bounds are shifted completely out of the partition.
if (lowerBound && lowerBound > cacheOffsetMax)
@@ -214,6 +428,14 @@ boost::optional<std::pair<int, int>> PartitionIterator::getEndpoints(const Windo
return {{lowerOffset, upperOffset}};
}
+optional<std::pair<int, int>> PartitionIterator::getEndpoints(
+ const WindowBounds& bounds, const optional<std::pair<int, int>>& hint = boost::none) {
+ if (!stdx::holds_alternative<WindowBounds::DocumentBased>(bounds.bounds)) {
+ return getEndpointsRangeBased(bounds, hint);
+ }
+ return getEndpointsDocumentBased(bounds, hint);
+}
+
void PartitionIterator::getNextDocument() {
tassert(5340103,
"Invalid call to PartitionIterator::getNextDocument",
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h
index c2021dca75c..1b199685238 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator.h
+++ b/src/mongo/db/pipeline/window_function/partition_iterator.h
@@ -32,22 +32,26 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/window_function/window_bounds.h"
+#include "mongo/db/query/sort_pattern.h"
namespace mongo {
/**
* This class provides an abstraction for accessing documents in a partition via an interator-type
- * interface. There is always a "current" document with which indexed access is relative to.
+ * interface. There is always a "current" document; operator[] provides random access relative to
+ * the current document, so that iter[+2] is refers to the 2 positions ahead of the current one.
+ *
+ * The 'partionExpr' is used to determine partition boundaries, provide the illusion that only the
+ * current partition exists.
+ *
+ * The 'sortPattern' is used for resolving range-based and time-based bounds, in 'getEndpoints()'.
*/
class PartitionIterator {
public:
PartitionIterator(ExpressionContext* expCtx,
DocumentSource* source,
- boost::optional<boost::intrusive_ptr<Expression>> partitionExpr)
- : _expCtx(expCtx),
- _source(source),
- _partitionExpr(partitionExpr),
- _state(IteratorState::kNotInitialized) {}
+ boost::optional<boost::intrusive_ptr<Expression>> partitionExpr,
+ const boost::optional<SortPattern>& sortPattern);
using SlotId = unsigned int;
SlotId newSlot() {
@@ -116,7 +120,7 @@ private:
boost::optional<Document> operator[](int index);
/**
- * Resolve any type of WindowBounds to a concrete pair of indices, '[lower, upper]'.
+ * Resolves any type of WindowBounds to a concrete pair of indices, '[lower, upper]'.
*
* Both 'lower' and 'upper' are valid offsets, such that '(*this)[lower]' and '(*this)[upper]'
* returns a document. If the window contains one document, then 'lower == upper'. If the
@@ -131,8 +135,46 @@ private:
*
* This method is non-const because it may pull documents into memory up to the end of the
* window.
+ *
+ * 'hint', if specified, should be the last result of getEndpoints() for the same 'bounds'.
+ */
+ boost::optional<std::pair<int, int>> getEndpoints(
+ const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint);
+
+ /**
+ * Returns the smallest offset 'i' such that (*this)[i] is in '_cache'.
+ *
+ * This value is negative or zero, because the current document is always in '_cache'.
*/
- boost::optional<std::pair<int, int>> getEndpoints(const WindowBounds& bounds);
+ auto getMinCachedOffset() const {
+ return -_currentCacheIndex;
+ }
+
+ /**
+ * Returns the largest offset 'i' such that (*this)[i] is in '_cache'.
+ *
+ * Note that offsets greater than 'i' might still be in the partition, even though they
+ * haven't been loaded into '_cache' yet. If you want to know where the partition ends,
+ * call 'cacheWholePartition' first.
+ *
+ * This value is positive or zero, because the current document is always in '_cache'.
+ */
+ auto getMaxCachedOffset() const {
+ return getMinCachedOffset() + _cache.size() - 1;
+ }
+
+ /**
+ * Loads documents into '_cache' until we reach a partition boundary.
+ */
+ void cacheWholePartition() {
+ // Start from one past the end of the _cache.
+ int i = getMinCachedOffset() + _cache.size();
+ // If we have already loaded everything into '_cache' then this condition will be false
+ // immediately.
+ while ((*this)[i]) {
+ ++i;
+ }
+ }
/**
* Marks the given index as expired for the slot 'id'. This does not necessarily mean that the
@@ -181,9 +223,21 @@ private:
_state = IteratorState::kIntraPartition;
}
+ // Internal helpers for 'getEndpoints()'.
+ boost::optional<std::pair<int, int>> getEndpointsRangeBased(
+ const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint);
+ boost::optional<std::pair<int, int>> getEndpointsDocumentBased(
+ const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint);
+
ExpressionContext* _expCtx;
DocumentSource* _source;
boost::optional<boost::intrusive_ptr<Expression>> _partitionExpr;
+
+ // '_sortExpr' tells us which field is the "time" field. When the user writes
+ // 'sortBy: {ts: 1}', any time-based or range-based window bounds are defined using
+ // the value of the "$ts" field. This _sortExpr is used in getEndpoints().
+ boost::optional<boost::intrusive_ptr<ExpressionFieldPath>> _sortExpr;
+
std::deque<Document> _cache;
// '_cache[_currentCacheIndex]' is the current document, which '(*this)[0]' returns.
int _currentCacheIndex = 0;
@@ -237,9 +291,12 @@ public:
// This policy assumes that when the caller accesses a certain index 'i', that it will no
// longer require all documents up to and including the document at index 'i'.
kDefaultSequential,
- // This policy should be used if the caller requires the endpoints of a window, potentially
- // accessing the same bound twice.
- kEndpointsOnly
+ // This policy should be used if the caller requires the endpoints of a window. Documents
+ // to the left of the left endpoint may disappear on the next call to releaseExpired().
+ kEndpoints,
+ // This policy means the caller only looks at how the right endpoint changes.
+ // The caller may look at documents between the most recent two right endpoints.
+ kRightEndpoint,
};
PartitionAccessor(PartitionIterator* iter, Policy policy)
: _iter(iter), _slot(iter->newSlot()), _policy(policy) {}
@@ -250,7 +307,8 @@ public:
case Policy::kDefaultSequential:
_iter->expireUpTo(_slot, index);
break;
- case Policy::kEndpointsOnly:
+ case Policy::kEndpoints:
+ case Policy::kRightEndpoint:
break;
}
return ret;
@@ -260,12 +318,32 @@ public:
return _iter->getCurrentPartitionIndex();
}
- boost::optional<std::pair<int, int>> getEndpoints(const WindowBounds& bounds) {
- tassert(5371201, "Invalid usage of partition accessor", _policy == Policy::kEndpointsOnly);
- auto endpoints = _iter->getEndpoints(bounds);
- // With this policy, all documents before the lower bound can be marked as expired.
- if (endpoints) {
- _iter->expireUpTo(_slot, endpoints->first - 1);
+ boost::optional<std::pair<int, int>> getEndpoints(
+ const WindowBounds& bounds,
+ const boost::optional<std::pair<int, int>>& hint = boost::none) {
+ auto endpoints = _iter->getEndpoints(bounds, hint);
+ switch (_policy) {
+ case Policy::kDefaultSequential:
+ tasserted(5371201, "Invalid usage of partition accessor");
+ break;
+ case Policy::kEndpoints:
+ // With this policy, all documents before the lower bound can be marked as expired.
+ // They will only be released on the next call to releaseExpired(), so when
+ // getEndpoints() returns, the caller may also look at documents from the previous
+ // result of getEndpoints(), until it returns control to the DocumentSource.
+ if (endpoints) {
+ _iter->expireUpTo(_slot, endpoints->first - 1);
+ }
+ break;
+ case Policy::kRightEndpoint:
+ // With this policy, all documents before the upper bound can be marked as expired.
+ // They will only be released on the next call to releaseExpired(), so when
+ // getEndpoints() returns, the caller may also look at documents from the previous
+ // result of getEndpoints(), until it returns control to the DocumentSource.
+ if (endpoints) {
+ _iter->expireUpTo(_slot, endpoints->second - 1);
+ }
+ break;
}
return endpoints;
}
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
index 068f7912676..81b34e93910 100644
--- a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
+++ b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp
@@ -47,7 +47,8 @@ public:
auto makeDefaultAccessor(
boost::intrusive_ptr<DocumentSourceMock> mock,
boost::optional<boost::intrusive_ptr<Expression>> partExpr = boost::none) {
- _iter = std::make_unique<PartitionIterator>(getExpCtx().get(), mock.get(), partExpr);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), mock.get(), partExpr, boost::none);
return PartitionAccessor(_iter.get(), PartitionAccessor::Policy::kDefaultSequential);
}
@@ -255,7 +256,6 @@ TEST_F(PartitionIteratorTest, CurrentOffsetIsCorrectAfterDocumentsAreAccessed) {
getExpCtx().get(), "a", getExpCtx()->variablesParseState);
auto partIter =
makeDefaultAccessor(mock, boost::optional<boost::intrusive_ptr<Expression>>(key));
- ASSERT_EQ(0, partIter.getCurrentPartitionIndex());
auto doc = partIter[0];
advance();
ASSERT_EQ(1, partIter.getCurrentPartitionIndex());
@@ -273,7 +273,7 @@ TEST_F(PartitionIteratorTest, OutsideOfPartitionAccessShouldNotTassert) {
const auto docs =
std::deque<DocumentSource::GetNextResult>{Document{{"a", 1}}, Document{{"a", 2}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
// Test that an accessor that attempts to read off the end of the partition returns boost::none
@@ -288,7 +288,7 @@ DEATH_TEST_F(PartitionIteratorTest,
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
// Access the first document, which marks it as expired.
ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *accessor[0]);
@@ -304,7 +304,7 @@ DEATH_TEST_F(PartitionIteratorTest,
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
auto laggingAccessor =
PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
auto leadingAccessor =
@@ -335,8 +335,8 @@ DEATH_TEST_F(PartitionIteratorTest,
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none);
- auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
+ auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
// Mock a window with documents [1, 2].
auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(1 << 2)),
SortPattern(BSON("a" << 1), getExpCtx()),
@@ -365,15 +365,13 @@ DEATH_TEST_F(PartitionIteratorTest,
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
// Create two endpoint accessors, one at [-1, 0] and another at [0, 1]. Since the first one may
// access the document at (current - 1), the only expiration that can happen on advance() would
// be (newCurrent - 2).
- auto lookBehindAccessor =
- PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly);
- auto lookAheadAccessor =
- PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly);
+ auto lookBehindAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
+ auto lookAheadAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
auto negBounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(-1 << 0)),
SortPattern(BSON("a" << 1), getExpCtx()),
getExpCtx().get());
@@ -413,12 +411,48 @@ DEATH_TEST_F(PartitionIteratorTest,
ASSERT_THROWS_CODE(lookBehindAccessor[-3], AssertionException, 5371202);
}
+DEATH_TEST_F(PartitionIteratorTest,
+ SingleConsumerRightEndpointPolicy,
+ "Invalid access of expired document") {
+ const auto docs = std::deque<DocumentSource::GetNextResult>{
+ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
+ const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
+ auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kRightEndpoint);
+ // Use a window of 'documents: [-2, -1]'.
+ auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(-2 << -1)),
+ SortPattern(BSON("a" << 1), getExpCtx()),
+ getExpCtx().get());
+
+ // Advance until {a: 3} is the current document.
+ partIter.advance();
+ partIter.advance();
+ ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *accessor[0]);
+
+ // Retrieving the endpoints triggers the expiration: everything below the right endpoint
+ // is marked as no longer needed.
+ auto endpoints = accessor.getEndpoints(bounds);
+ // The endpoints are {a: 1} and {a: 2}. So we will expect {a: 1} to be released.
+ ASSERT(endpoints != boost::none);
+ ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *accessor[endpoints->first]);
+ ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *accessor[endpoints->second]);
+
+ // The no-longer-needed documents are released on the next advance().
+ partIter.advance();
+
+ // The current document is now {a: 4}, and {a: 1} has been released.
+ ASSERT_DOCUMENT_EQ(docs[3].getDocument(), *accessor[0]);
+ ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *accessor[-1]);
+ ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *accessor[-2]);
+ ASSERT_THROWS_CODE(accessor[-3], AssertionException, 5371202);
+}
+
DEATH_TEST_F(PartitionIteratorTest, MixedPolicy, "Invalid access of expired document") {
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}};
const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx());
- auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none);
- auto endpointAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly);
+ auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none);
+ auto endpointAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints);
auto defaultAccessor =
PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential);
// Mock a window with documents [1, 2].
diff --git a/src/mongo/db/pipeline/window_function/window_bounds.cpp b/src/mongo/db/pipeline/window_function/window_bounds.cpp
index 20be8c30856..babb0394673 100644
--- a/src/mongo/db/pipeline/window_function/window_bounds.cpp
+++ b/src/mongo/db/pipeline/window_function/window_bounds.cpp
@@ -213,7 +213,7 @@ WindowBounds WindowBounds::parse(BSONObj args,
}
uassert(5339902,
"Range-based bounds require sortBy a single field",
- bounds.isUnbounded() || (sortBy && sortBy->size() == 1));
+ sortBy && sortBy->size() == 1);
return bounds;
}
}
diff --git a/src/mongo/db/pipeline/window_function/window_bounds.h b/src/mongo/db/pipeline/window_function/window_bounds.h
index f7b38a2e787..523ac095975 100644
--- a/src/mongo/db/pipeline/window_function/window_bounds.h
+++ b/src/mongo/db/pipeline/window_function/window_bounds.h
@@ -94,7 +94,7 @@ struct WindowBounds {
}
/**
- * Check if these bounds are unbounded on both ends.
+ * Checks whether these bounds are unbounded on both ends.
* This case is special because it means you don't need a sortBy to interpret the bounds:
* the bounds include every document (in the current partition).
*/
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.cpp b/src/mongo/db/pipeline/window_function/window_function_exec.cpp
index 6bec87bf8b8..ab154b720d4 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec.cpp
@@ -30,7 +30,9 @@
#include "mongo/db/pipeline/window_function/window_function_exec.h"
#include "mongo/db/pipeline/window_function/window_function_exec_derivative.h"
#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h"
+#include "mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h"
#include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h"
+#include "mongo/db/pipeline/window_function/window_function_exec_removable_range.h"
namespace mongo {
@@ -75,6 +77,7 @@ std::unique_ptr<WindowFunctionExec> translateDerivative(
} // namespace
std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
+ ExpressionContext* expCtx,
PartitionIterator* iter,
const WindowFunctionStatement& functionStmt,
const boost::optional<SortPattern>& sortBy) {
@@ -84,22 +87,46 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create(
return translateDerivative(iter, *deriv, sortBy);
}
- // Use a sentinel variable to avoid a compilation error when some cases of std::visit don't
- // return a value.
- std::unique_ptr<WindowFunctionExec> exec;
- stdx::visit(
+ WindowBounds bounds = functionStmt.expr->bounds();
+
+ return stdx::visit(
visit_helper::Overloaded{
- [&](const WindowBounds::DocumentBased& docBase) {
- exec = translateDocumentWindow(iter, functionStmt.expr, docBase);
+ [&](const WindowBounds::DocumentBased& docBounds) {
+ return translateDocumentWindow(iter, functionStmt.expr, docBounds);
},
- [&](const WindowBounds::RangeBased& rangeBase) {
- uasserted(5397901, "Ranged based windows not currently supported");
+ [&](const WindowBounds::RangeBased& rangeBounds)
+ -> std::unique_ptr<WindowFunctionExec> {
+ // These checks should be enforced already during parsing.
+ tassert(5429401,
+ "Range-based window needs a non-compound sortBy",
+ sortBy != boost::none && sortBy->size() == 1);
+ SortPattern::SortPatternPart part = *sortBy->begin();
+ tassert(5429410,
+ "Range-based window doesn't work on expression-sortBy",
+ part.fieldPath != boost::none && !part.expression);
+ auto sortByExpr = ExpressionFieldPath::createPathFromString(
+ expCtx, part.fieldPath->fullPath(), expCtx->variablesParseState);
+
+ if (stdx::holds_alternative<WindowBounds::Unbounded>(rangeBounds.lower)) {
+ return std::make_unique<WindowFunctionExecNonRemovableRange>(
+ iter,
+ functionStmt.expr->input(),
+ std::move(sortByExpr),
+ functionStmt.expr->buildAccumulatorOnly(),
+ bounds);
+ } else {
+ return std::make_unique<WindowFunctionExecRemovableRange>(
+ iter,
+ functionStmt.expr->input(),
+ std::move(sortByExpr),
+ functionStmt.expr->buildRemovable(),
+ bounds);
+ }
},
- [&](const WindowBounds::TimeBased& timeBase) {
+ [&](const WindowBounds::TimeBased& timeBounds) -> std::unique_ptr<WindowFunctionExec> {
uasserted(5397902, "Time based windows are not currently supported");
}},
- functionStmt.expr->bounds().bounds);
- return exec;
+ bounds.bounds);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.h b/src/mongo/db/pipeline/window_function/window_function_exec.h
index b46de96d36f..e98563f8eca 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec.h
@@ -56,7 +56,8 @@ public:
* Creates an appropriate WindowFunctionExec that is capable of evaluating the window function
* over the given bounds, both found within the WindowFunctionStatement.
*/
- static std::unique_ptr<WindowFunctionExec> create(PartitionIterator* iter,
+ static std::unique_ptr<WindowFunctionExec> create(ExpressionContext* expCtx,
+ PartitionIterator* iter,
const WindowFunctionStatement& functionStmt,
const boost::optional<SortPattern>& sortBy);
@@ -91,22 +92,8 @@ protected:
*/
class WindowFunctionExecRemovable : public WindowFunctionExec {
public:
- WindowFunctionExecRemovable(PartitionIterator* iter,
- boost::intrusive_ptr<Expression> input,
- std::unique_ptr<WindowFunctionState> function)
- : WindowFunctionExec(
- PartitionAccessor(iter, PartitionAccessor::Policy::kDefaultSequential)),
- _input(std::move(input)),
- _function(std::move(function)) {}
-
- Value getNext() {
- if (!_initialized) {
- this->initialize();
- _initialized = true;
- return _function->getValue();
- }
- processDocumentsToUpperBound();
- removeDocumentsUnderLowerBound();
+ Value getNext() override {
+ update();
return _function->getValue();
}
@@ -118,15 +105,14 @@ public:
return _function->getApproximateSize() + _memUsageBytes;
}
-
protected:
- boost::intrusive_ptr<Expression> _input;
- std::unique_ptr<WindowFunctionState> _function;
- // Keep track of values in the window function that will need to be removed later.
- std::queue<Value> _values;
- // In one of two states: either the initial window has not been populated or we are sliding and
- // accumulating/removing values.
- bool _initialized = false;
+ WindowFunctionExecRemovable(PartitionIterator* iter,
+ PartitionAccessor::Policy policy,
+ boost::intrusive_ptr<Expression> input,
+ std::unique_ptr<WindowFunctionState> function)
+ : WindowFunctionExec(PartitionAccessor(iter, policy)),
+ _input(std::move(input)),
+ _function(std::move(function)) {}
void addValue(Value v) {
_function->add(v);
@@ -134,14 +120,31 @@ protected:
_memUsageBytes += v.getApproximateSize();
}
+ void removeValue() {
+ tassert(5429400, "Tried to remove more values than we added", !_values.empty());
+ auto v = _values.front();
+ _function->remove(v);
+ _values.pop();
+ _memUsageBytes -= v.getApproximateSize();
+ }
+
+ boost::intrusive_ptr<Expression> _input;
+ std::unique_ptr<WindowFunctionState> _function;
+ // Keep track of values in the window function that will need to be removed later.
+ std::queue<Value> _values;
+
// Track the byte size of the values being stored by this class. Does not include the constant
// size objects being held or the overhead of the data structures.
size_t _memUsageBytes = 0;
private:
- virtual void processDocumentsToUpperBound() = 0;
- virtual void removeDocumentsUnderLowerBound() = 0;
- virtual void initialize() = 0;
+ /**
+ * This method notifies the executor that the underlying PartitionIterator
+ * '_iter' has been advanced one time since the last call to initialize() or
+ * update(). It should determine how the window has changed (which documents have
+ * entered it? which have left it?) and call addValue(), removeValue() as needed.
+ */
+ virtual void update() = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h
index 2b02ed0db14..1bac1417b99 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h
@@ -57,7 +57,7 @@ public:
boost::intrusive_ptr<Expression> time,
WindowBounds bounds,
boost::optional<TimeUnit> outputUnit)
- : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpointsOnly)),
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints)),
_position(std::move(position)),
_time(std::move(time)),
_bounds(std::move(bounds)),
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp
index 548dda10a46..da22a7762c3 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp
@@ -52,8 +52,8 @@ public:
WindowBounds bounds,
boost::optional<TimeUnit> timeUnit = boost::none) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter =
- std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), boost::none, boost::none);
auto position = ExpressionFieldPath::parse(
getExpCtx().get(), positionPath, getExpCtx()->variablesParseState);
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h b/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h
index ef4c4654fd7..e5ab3b4065c 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h
@@ -48,7 +48,7 @@ protected:
boost::intrusive_ptr<Expression> input,
WindowBounds bounds,
boost::optional<boost::intrusive_ptr<Expression>> defaultValue)
- : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpointsOnly)),
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints)),
_input(std::move(input)),
_bounds(std::move(bounds)) {
if (!defaultValue) {
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp
index 3f86e395522..f92f32b24db 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp
@@ -57,7 +57,7 @@ public:
auto vps = expCtx->variablesParseState;
auto optKey =
keyPath ? optExp(ExpressionFieldPath::parse(expCtx, *keyPath, vps)) : boost::none;
- _iter = std::make_unique<PartitionIterator>(expCtx, _docSource.get(), optKey);
+ _iter = std::make_unique<PartitionIterator>(expCtx, _docSource.get(), optKey, boost::none);
auto inputField = ExpressionFieldPath::parse(expCtx, "$val", vps);
auto defaultExp = defaultValue
? optExp(ExpressionConstant::parse(expCtx, *defaultValue, vps))
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h
new file mode 100644
index 00000000000..104e5af4d0b
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h
@@ -0,0 +1,117 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/partition_iterator.h"
+#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h"
+
+namespace mongo {
+
+/**
+ * An executor that handles left-unbounded, range-based windows.
+ */
+class WindowFunctionExecNonRemovableRange final : public WindowFunctionExec {
+public:
+ WindowFunctionExecNonRemovableRange(PartitionIterator* iter,
+ boost::intrusive_ptr<Expression> input,
+ boost::intrusive_ptr<ExpressionFieldPath> sortExpr,
+ boost::intrusive_ptr<AccumulatorState> function,
+ WindowBounds bounds)
+ : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kRightEndpoint)),
+ _input(std::move(input)),
+ _sortExpr(std::move(sortExpr)),
+ _function(std::move(function)),
+ _bounds(bounds) {}
+
+ Value getNext() final {
+ update();
+ return _function->getValue(false);
+ }
+
+ size_t getApproximateSize() const final {
+ return _function->getMemUsage();
+ }
+
+ void reset() final {
+ _function->reset();
+ _lastEndpoints = boost::none;
+ }
+
+private:
+ void update() {
+ auto endpoints = _iter.getEndpoints(_bounds, _lastEndpoints);
+ // There are 4 different transitions we can make:
+ if (_lastEndpoints) {
+ if (endpoints) {
+ // Transition from nonempty to nonempty: add new values based on how the window
+ // changed.
+ for (int i = _lastEndpoints->second + 1; i <= endpoints->second; ++i) {
+ addValueAt(i);
+ }
+ } else {
+ // Transition from nonempty to empty: discard the accumulator state.
+ _function->reset();
+ }
+ } else {
+ if (endpoints) {
+ // Transition from empty to nonempty: add the new values.
+ for (int i = endpoints->first; i <= endpoints->second; ++i) {
+ addValueAt(i);
+ }
+ } else {
+ // Transition from empty to empty: nothing to do!
+ }
+ }
+
+ if (endpoints) {
+ // Shift endpoints by 1 because we will have advanced by 1 document on the next call
+ // to update().
+ _lastEndpoints = std::pair(endpoints->first - 1, endpoints->second - 1);
+ } else {
+ _lastEndpoints = boost::none;
+ }
+ }
+ void addValueAt(int offset) {
+ auto doc = _iter[offset];
+ tassert(5429411, "endpoints must fall in the partition", doc);
+ Value v = _input->evaluate(*doc, &_input->getExpressionContext()->variables);
+ _function->process(v, false);
+ }
+
+ boost::intrusive_ptr<Expression> _input;
+ boost::intrusive_ptr<ExpressionFieldPath> _sortExpr;
+ boost::intrusive_ptr<AccumulatorState> _function;
+ WindowBounds _bounds;
+
+ boost::optional<std::pair<int, int>> _lastEndpoints;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
index efa7c88cccd..d8042421f95 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp
@@ -55,8 +55,8 @@ public:
const std::string& inputPath,
WindowBounds::Bound<int> upper) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter =
- std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), boost::none, boost::none);
auto input = ExpressionFieldPath::parse(
getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
return WindowFunctionExecNonRemovable<AccumulatorState>(
@@ -123,8 +123,10 @@ TEST_F(WindowFunctionExecNonRemovableTest, AccumulateOnlyWithMultiplePartitions)
auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
auto key = ExpressionFieldPath::createPathFromString(
getExpCtx().get(), "key", getExpCtx()->variablesParseState);
- auto iter = PartitionIterator(
- getExpCtx().get(), mock.get(), boost::optional<boost::intrusive_ptr<Expression>>(key));
+ auto iter = PartitionIterator(getExpCtx().get(),
+ mock.get(),
+ boost::optional<boost::intrusive_ptr<Expression>>(key),
+ boost::none);
auto input =
ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
auto mgr = WindowFunctionExecNonRemovable<AccumulatorState>(
@@ -155,8 +157,8 @@ TEST_F(WindowFunctionExecNonRemovableTest, InputExpressionAllowedToCreateVariabl
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
auto docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- auto iter =
- std::make_unique<PartitionIterator>(getExpCtx().get(), docSource.get(), boost::none);
+ auto iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), docSource.get(), boost::none, boost::none);
auto filterBSON =
fromjson("{$filter: {input: [1, 2, 3], as: 'num', cond: {$gte: ['$$num', 2]}}}");
auto input = ExpressionFilter::parse(
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
index a459d295a8e..1d3ef26485f 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp
@@ -38,7 +38,10 @@ WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument(
boost::intrusive_ptr<Expression> input,
std::unique_ptr<WindowFunctionState> function,
WindowBounds::DocumentBased bounds)
- : WindowFunctionExecRemovable(iter, std::move(input), std::move(function)) {
+ : WindowFunctionExecRemovable(iter,
+ PartitionAccessor::Policy::kDefaultSequential,
+ std::move(input),
+ std::move(function)) {
stdx::visit(
visit_helper::Overloaded{
[](const WindowBounds::Unbounded&) {
@@ -73,9 +76,15 @@ void WindowFunctionExecRemovableDocument::initialize() {
break;
}
}
+ _initialized = true;
}
-void WindowFunctionExecRemovableDocument::processDocumentsToUpperBound() {
+void WindowFunctionExecRemovableDocument::update() {
+ if (!_initialized) {
+ initialize();
+ return;
+ }
+
// If there is no upper bound, the whole partition is loaded by initialize.
if (_upperBound) {
// If this is false, we're over the end of the partition.
@@ -83,9 +92,7 @@ void WindowFunctionExecRemovableDocument::processDocumentsToUpperBound() {
addValue(_input->evaluate(*doc, &_input->getExpressionContext()->variables));
}
}
-}
-void WindowFunctionExecRemovableDocument::removeDocumentsUnderLowerBound() {
// For a positive lower bound the first pass loads the correct window, so subsequent passes
// must always remove a document if there is a document left to remove.
// For a negative lower bound we can start removing every time only after we have seen
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
index dc8327368ab..55a1ed4a76a 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h
@@ -61,11 +61,8 @@ public:
}
private:
- void processDocumentsToUpperBound() final;
-
- void removeDocumentsUnderLowerBound() final;
-
- void initialize() final;
+ void update() final;
+ void initialize();
void removeFirstValueIfExists() {
if (_values.size() == 0) {
@@ -76,6 +73,10 @@ private:
_values.pop();
}
+ // In one of two states: either the initial window has not been populated or we are sliding and
+ // accumulating/removing values.
+ bool _initialized = false;
+
int _lowerBound;
// Will stay boost::none if right unbounded.
boost::optional<int> _upperBound = boost::none;
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp
new file mode 100644
index 00000000000..5783e2da1ec
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/window_function/window_function_exec_removable_range.h"
+
+using boost::optional;
+using std::pair;
+
+namespace mongo {
+
+WindowFunctionExecRemovableRange::WindowFunctionExecRemovableRange(
+ PartitionIterator* iter,
+ boost::intrusive_ptr<Expression> input,
+ boost::intrusive_ptr<ExpressionFieldPath> sortBy,
+ std::unique_ptr<WindowFunctionState> function,
+ WindowBounds bounds)
+ : WindowFunctionExecRemovable(
+ iter, PartitionAccessor::Policy::kEndpoints, std::move(input), std::move(function)),
+ _sortBy(std::move(sortBy)),
+ _bounds(std::move(bounds)) {}
+
+namespace {
+struct EndpointsChange {
+ optional<pair<int, int>> added;
+ optional<pair<int, int>> removed;
+};
+
+/**
+ * Diffs two intervals: the result is expressed as two new intervals, for the added and removed
+ * elements. The intervals are all represented as inclusive [lower, upper], so an empty interval
+ * is represented as boost::none.
+ *
+ * For example, in 'diff([2, 10], [5, 14])' the lower bound changed from 2 to 5, so 'removed' is
+ * [2, 4], and the upper bound changed from 10 to 14, so 'added' is [11, 14].
+ */
+EndpointsChange diffEndpoints(optional<pair<int, int>> old, optional<pair<int, int>> current) {
+ EndpointsChange result;
+
+ if (!old && !current) {
+ return result;
+ }
+ if (!old) {
+ result.added = current;
+ return result;
+ }
+ if (!current) {
+ result.removed = old;
+ return result;
+ }
+
+ auto [oldLower, oldUpper] = *old;
+ auto [lower, upper] = *current;
+ tassert(5429407, "Endpoints should never decrease.", oldLower <= lower && oldUpper <= upper);
+ if (oldLower < lower) {
+ result.removed = std::pair(oldLower, lower - 1);
+ }
+ if (oldUpper < upper) {
+ result.added = std::pair(oldUpper + 1, upper);
+ }
+ return result;
+}
+} // namespace
+
+void WindowFunctionExecRemovableRange::update() {
+ // Calling getEndpoints here also informs the PartitionAccessor that we won't need documents
+ // to the left of endpoints->first. However, we need to access those documents here in
+ // update(), to remove them from the WindowFunctionState. This is ok, because the documents
+ // expire later, on the next call to releaseExpired(). We can still use the documents between
+ // _lastEndpoints during this update().
+ auto endpoints = _iter.getEndpoints(_bounds, _lastEndpoints);
+ auto [added, removed] = diffEndpoints(_lastEndpoints, endpoints);
+
+ if (added) {
+ auto [lower, upper] = *added;
+ for (auto i = lower; i <= upper; ++i) {
+ addValue(_input->evaluate(*_iter[i], nullptr));
+ }
+ }
+ if (removed) {
+ auto [lower, upper] = *removed;
+ for (auto i = lower; i <= upper; ++i) {
+ removeValue();
+ }
+ }
+
+ // Update _lastEndpoints.
+ if (endpoints) {
+ auto [lower, upper] = *endpoints;
+ // On the next call to update(), we will have advanced by 1 document.
+ // The document we call '0' now, will be called '-1' on that next update().
+ _lastEndpoints = std::pair(lower - 1, upper - 1);
+ } else {
+ _lastEndpoints = boost::none;
+ }
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h
new file mode 100644
index 00000000000..b58416fac7f
--- /dev/null
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/window_function/partition_iterator.h"
+#include "mongo/db/pipeline/window_function/window_bounds.h"
+#include "mongo/db/pipeline/window_function/window_function_exec.h"
+
+namespace mongo {
+
+/**
+ * An executor that handles left-bounded, range-based windows.
+ *
+ * Left-bounded windows require a window-function state that supports removing elements.
+ */
+class WindowFunctionExecRemovableRange final : public WindowFunctionExecRemovable {
+public:
+ /**
+ * Constructs a removable window function executor with the given input expression to be
+ * evaluated and passed to the corresponding WindowFunc for each document in the window.
+ *
+ * For example, in
+ * {$setWindowFields: {
+ * sortBy: {ts: 1},
+ * output: {
+ * v: {$max: "$x", window: {range: [-5, 'unbounded']}}
+ * }
+ * }}
+ *
+ * 'input' is "$x"
+ * 'sortBy' is "$ts" (translated from the sort spec, {ts: 1})
+ * 'function' is a WindowFunctionMax
+ * 'bounds' is WindowBounds::RangeBased{Value{-5}, WindowBounds::Unbounded{}}
+ */
+ WindowFunctionExecRemovableRange(PartitionIterator* iter,
+ boost::intrusive_ptr<Expression> input,
+ boost::intrusive_ptr<ExpressionFieldPath> sortBy,
+ std::unique_ptr<WindowFunctionState> function,
+ WindowBounds bounds);
+
+ Value getNext() override {
+ update();
+ return _function->getValue();
+ }
+
+ void reset() final {
+ _function->reset();
+ _lastEndpoints = boost::none;
+ _memUsageBytes = 0;
+ }
+
+private:
+ void update() final;
+
+ boost::intrusive_ptr<ExpressionFieldPath> _sortBy;
+ WindowBounds _bounds;
+ boost::optional<std::pair<int, int>> _lastEndpoints;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
index 7cf9746f00c..cf8deb2c3bd 100644
--- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
+++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp
@@ -54,8 +54,8 @@ public:
const std::string& inputPath,
WindowBounds::DocumentBased bounds) {
_docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- _iter =
- std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none);
+ _iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), _docSource.get(), boost::none, boost::none);
auto input = ExpressionFieldPath::parse(
getExpCtx().get(), inputPath, getExpCtx()->variablesParseState);
std::unique_ptr<WindowFunctionState> maxFunc =
@@ -256,8 +256,10 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) {
auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
auto key = ExpressionFieldPath::createPathFromString(
getExpCtx().get(), "key", getExpCtx()->variablesParseState);
- auto iter = PartitionIterator{
- getExpCtx().get(), mock.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)};
+ auto iter = PartitionIterator{getExpCtx().get(),
+ mock.get(),
+ boost::optional<boost::intrusive_ptr<Expression>>(key),
+ boost::none};
auto input =
ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString;
@@ -289,7 +291,8 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) {
getExpCtx().get(), "key", getExpCtx()->variablesParseState);
auto iter = PartitionIterator{getExpCtx().get(),
mockTwo.get(),
- boost::optional<boost::intrusive_ptr<Expression>>(keyTwo)};
+ boost::optional<boost::intrusive_ptr<Expression>>(keyTwo),
+ boost::none};
auto input =
ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState);
auto maxFunc = std::make_unique<WindowFunctionMax>(getExpCtx().get());
@@ -310,8 +313,8 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, InputExpressionAllowedToCreateVa
const auto docs = std::deque<DocumentSource::GetNextResult>{
Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}};
auto docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx());
- auto iter =
- std::make_unique<PartitionIterator>(getExpCtx().get(), docSource.get(), boost::none);
+ auto iter = std::make_unique<PartitionIterator>(
+ getExpCtx().get(), docSource.get(), boost::none, boost::none);
auto filterBSON =
fromjson("{$filter: {input: [1, 2, 3], as: 'num', cond: {$gte: ['$$num', 2]}}}");
auto input = ExpressionFilter::parse(