diff options
author | David Percy <david.percy@mongodb.com> | 2021-03-05 21:21:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-19 19:15:48 +0000 |
commit | c3209ebb1db49656dcc3d1edf3635f7c7e96add5 (patch) | |
tree | b3f920a3df53b171145371c734ba9185dc3e3bdc /src/mongo/db/pipeline/window_function | |
parent | 481bade9c40d662bf463291645bbf9fddce3a11f (diff) | |
download | mongo-c3209ebb1db49656dcc3d1edf3635f7c7e96add5.tar.gz |
SERVER-54294 Support range-based bounds for window functions
Diffstat (limited to 'src/mongo/db/pipeline/window_function')
18 files changed, 827 insertions, 120 deletions
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp index dad88025ebd..32967618454 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp +++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp @@ -32,9 +32,63 @@ #include "mongo/db/pipeline/window_function/partition_iterator.h" #include "mongo/util/visit_helper.h" +using boost::optional; + namespace mongo { -boost::optional<Document> PartitionIterator::operator[](int index) { +namespace { +/** + * Create an Expression from a SortPattern, if the sort is simple enough. + * + * The sort must have one, ascending, non-expression field. + * The field may be dotted. + * + * For example: {ab.c: 1} becomes "$ab.c", but {a: -1} becomes boost::none. + */ +static optional<boost::intrusive_ptr<ExpressionFieldPath>> exprFromSort( + ExpressionContext* expCtx, const optional<SortPattern>& sortPattern) { + if (!sortPattern) + return boost::none; + if (sortPattern->size() != 1) + return boost::none; + const SortPattern::SortPatternPart& part = *sortPattern->begin(); + + bool hasFieldPath = part.fieldPath != boost::none; + bool hasExpression = part.expression != nullptr; + tassert(5429403, + "SortPatternPart is supposed to have exactly one: fieldPath, or expression.", + hasFieldPath != hasExpression); + + if (hasExpression) + return boost::none; + + // Descending sorts are not allowed with range-based bounds. + // + // We think this would be confusing. + // Does [x, y] mean [lower, upper] or [left, right] ? + // + // For example, suppose you sort by {time: -1} to put recent documents first. + // Would you write 'range: [-5, +2]', with the smaller value first? + // Or would you write 'range: [+2, -5]', with the more recent value first? + if (!part.isAscending) + return boost::none; + + return ExpressionFieldPath::createPathFromString( + expCtx, part.fieldPath->fullPath(), expCtx->variablesParseState); +} +} // namespace + +PartitionIterator::PartitionIterator(ExpressionContext* expCtx, + DocumentSource* source, + optional<boost::intrusive_ptr<Expression>> partitionExpr, + const optional<SortPattern>& sortPattern) + : _expCtx(expCtx), + _source(source), + _partitionExpr(std::move(partitionExpr)), + _sortExpr(exprFromSort(_expCtx, sortPattern)), + _state(IteratorState::kNotInitialized) {} + +optional<Document> PartitionIterator::operator[](int index) { auto desired = _currentCacheIndex + index; if (_state == IteratorState::kAdvancedToEOF) @@ -152,49 +206,209 @@ PartitionIterator::AdvanceResult PartitionIterator::advance() { } namespace { -boost::optional<int> numericBound(WindowBounds::Bound<int> bound) { +optional<int> numericBound(WindowBounds::Bound<int> bound) { return stdx::visit( visit_helper::Overloaded{ - [](WindowBounds::Unbounded) -> boost::optional<int> { return boost::none; }, - [](WindowBounds::Current) -> boost::optional<int> { return 0; }, - [](int i) -> boost::optional<int> { return i; }, + [](WindowBounds::Unbounded) -> optional<int> { return boost::none; }, + [](WindowBounds::Current) -> optional<int> { return 0; }, + [](int i) -> optional<int> { return i; }, }, bound); } + +// Assumes both arguments are numeric, and performs Decimal128 addition on them. +Value decimalAdd(const Value& left, const Value& right) { + // Widening to Decimal128 is a convenient way to avoid having many cases for different numeric + // types. The 'threshold' values we compute are only used to choose a set of documents; the + // user can't observe the type. + return Value(left.coerceToDecimal().add(right.coerceToDecimal())); +} } // namespace -boost::optional<std::pair<int, int>> PartitionIterator::getEndpoints(const WindowBounds& bounds) { - // For range-based bounds, we will need to: - // 1. extract the sortBy for (*this)[0] - // 2. step backwards until we cross bounds.lower - // 3. step forwards until we cross bounds.upper - // This means we'll need to pass in sortBy somewhere. - tassert(5423300, - "TODO SERVER-54294: range-based and time-based bounds", - stdx::holds_alternative<WindowBounds::DocumentBased>(bounds.bounds)); +optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased( + const WindowBounds& bounds, const optional<std::pair<int, int>>& hint) { + tassert(5429404, "Missing _sortExpr with range-based bounds", _sortExpr != boost::none); + // TODO SERVER-54295: time-based bounds + uassert(5429402, + "Time-based bounds not supported yet", + stdx::holds_alternative<WindowBounds::RangeBased>(bounds.bounds)); + auto range = stdx::get<WindowBounds::RangeBased>(bounds.bounds); + + auto lessThan = _expCtx->getValueComparator().getLessThan(); + + Value base = (*_sortExpr)->evaluate(*(*this)[0], &_expCtx->variables); + uassert(5429413, + "Invalid range: For windows that involve date or time ranges, a unit must be provided.", + base.getType() != BSONType::Date); + uassert(5429414, + str::stream() << "Invalid range: Expected the sortBy field to be a number, but it was " + << base.getType(), + base.numeric()); + + // 'lower' is the smallest offset in the partition that's within the lower bound of the window. + optional<int> lower = stdx::visit( + visit_helper::Overloaded{ + [&](WindowBounds::Current) -> optional<int> { + // 'range: ["current", _]' means the current document, which is always offset 0. + return 0; + }, + [&](WindowBounds::Unbounded) -> optional<int> { + // Find the leftmost document whose sortBy field evaluates to a numeric value. + + // Start from the beginning, or the hint, whichever is higher. + // Note that the hint may no longer be a valid offset, if some documents were + // released from the cache. + int start = getMinCachedOffset(); + if (hint) { + start = std::max(hint->first, start); + } + + for (int i = start;; ++i) { + auto doc = (*this)[i]; + if (!doc) { + return boost::none; + } + Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables); + if (v.numeric()) { + return i; + } + } + }, + [&](const Value& delta) -> optional<int> { + Value threshold = decimalAdd(base, delta); + + // Start from the beginning, or the hint, whichever is higher. + // Note that the hint may no longer be a valid offset, if some documents were + // released from the cache. + int start = getMinCachedOffset(); + if (hint) { + start = std::max(hint->first, start); + } + + boost::optional<Document> doc; + for (int i = start; (doc = (*this)[i]); ++i) { + Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables); + if (!lessThan(v, threshold)) { + // This is the first doc we've scanned that crossed the threshold. + return i; + } + } + // We scanned every document in the partition, and none crossed the + // threshold. So the window must be shifted so far to the right that no + // documents fall in it. + return boost::none; + }, + }, + range.lower); + + if (!lower) + return boost::none; + + // 'upper' is the largest offset in the partition that's within the upper bound of the window. + optional<int> upper = stdx::visit( + visit_helper::Overloaded{ + [&](WindowBounds::Current) -> optional<int> { + // 'range: [_, "current"]' means the current document, which is offset 0. + return 0; + }, + [&](WindowBounds::Unbounded) -> optional<int> { + // Find the rightmost document whose sortBy field evaluates to a numeric value. + + // We know that the current document, the lower bound, and the hint (if present) + // are all numeric, so start scanning from whichever is highest. + int start = std::max(0, *lower); + if (hint) { + start = std::max(hint->second, start); + } + + boost::optional<Document> doc; + for (int i = start; (doc = (*this)[i]); ++i) { + Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables); + if (!v.numeric()) { + // The previously scanned doc is the rightmost numeric one. Since we start + // from '0', 'hint', or 'lower', which are all numeric, we should never hit + // this case on the first iteration. + tassert(5429412, + "Failed to find the rightmost numeric document, " + "while computing window bounds", + i != start); + return i - 1; + } + } + return getMaxCachedOffset(); + }, + [&](const Value& delta) -> optional<int> { + // Pull in documents until the sortBy value crosses 'base + delta'. + tassert(5429406, "Range-based bounds are specified as a number", delta.numeric()); + Value threshold = decimalAdd(base, delta); + + // If there's no hint, start scanning from the lower bound. + // If there is a hint, start from whichever is greater: lower bound or hint. + // Usually the hint is greater, but with bounds like [0, 0] the new lower bound + // will be greater than the old upper bound. + int start = *lower; + if (hint) { + start = std::max(hint->second, start); + } + + for (int i = start;; ++i) { + auto doc = (*this)[i]; + if (!doc) { + // We scanned every document in the partition, and none crossed the upper + // bound. So the upper bound contains everything up to the end of the + // partition. + return getMaxCachedOffset(); + } + Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables); + if (lessThan(threshold, v)) { + // This doc exceeded the upper bound. + // The previously scanned doc (if any) is the greatest in-bounds one. + if (i == start) { + // This case can happen, for example, at the beginning of a partition + // when the window is 'range: [-100, -5]'. There can be documents + // within the lower bound of -100, but none within the upper bound of + // -5. + return boost::none; + } else { + return i - 1; + } + } + } + }, + }, + range.upper); + + if (!upper) + return boost::none; + + return std::pair{*lower, *upper}; +} + +optional<std::pair<int, int>> PartitionIterator::getEndpointsDocumentBased( + const WindowBounds& bounds, const optional<std::pair<int, int>>& hint = boost::none) { tassert(5423301, "getEndpoints assumes there is a current document", (*this)[0] != boost::none); auto docBounds = stdx::get<WindowBounds::DocumentBased>(bounds.bounds); - boost::optional<int> lowerBound = numericBound(docBounds.lower); - boost::optional<int> upperBound = numericBound(docBounds.upper); + optional<int> lowerBound = numericBound(docBounds.lower); + optional<int> upperBound = numericBound(docBounds.upper); tassert(5423302, "Bounds should never be inverted", !lowerBound || !upperBound || lowerBound <= upperBound); // Pull documents into the cache until it contains the whole window. + // We want to know whether the window reaches the end of the partition. if (upperBound) { // For a right-bounded window we only need to pull in documents up to the bound. (*this)[*upperBound]; } else { // For a right-unbounded window we need to pull in the whole partition. operator[] reports // end of partition by returning boost::none instead of a document. - for (int i = 0; (*this)[i]; ++i) { - } + cacheWholePartition(); } // Valid offsets into the cache are any 'i' such that '_cache[_currentCacheIndex + i]' is valid. // We know the cache is nonempty because it contains the current document. - int cacheOffsetMin = -_currentCacheIndex; - int cacheOffsetMax = cacheOffsetMin + _cache.size() - 1; + int cacheOffsetMin = getMinCachedOffset(); + int cacheOffsetMax = getMaxCachedOffset(); // The window can only be empty if the bounds are shifted completely out of the partition. if (lowerBound && lowerBound > cacheOffsetMax) @@ -214,6 +428,14 @@ boost::optional<std::pair<int, int>> PartitionIterator::getEndpoints(const Windo return {{lowerOffset, upperOffset}}; } +optional<std::pair<int, int>> PartitionIterator::getEndpoints( + const WindowBounds& bounds, const optional<std::pair<int, int>>& hint = boost::none) { + if (!stdx::holds_alternative<WindowBounds::DocumentBased>(bounds.bounds)) { + return getEndpointsRangeBased(bounds, hint); + } + return getEndpointsDocumentBased(bounds, hint); +} + void PartitionIterator::getNextDocument() { tassert(5340103, "Invalid call to PartitionIterator::getNextDocument", diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h index c2021dca75c..1b199685238 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.h +++ b/src/mongo/db/pipeline/window_function/partition_iterator.h @@ -32,22 +32,26 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/window_function/window_bounds.h" +#include "mongo/db/query/sort_pattern.h" namespace mongo { /** * This class provides an abstraction for accessing documents in a partition via an interator-type - * interface. There is always a "current" document with which indexed access is relative to. + * interface. There is always a "current" document; operator[] provides random access relative to + * the current document, so that iter[+2] is refers to the 2 positions ahead of the current one. + * + * The 'partionExpr' is used to determine partition boundaries, provide the illusion that only the + * current partition exists. + * + * The 'sortPattern' is used for resolving range-based and time-based bounds, in 'getEndpoints()'. */ class PartitionIterator { public: PartitionIterator(ExpressionContext* expCtx, DocumentSource* source, - boost::optional<boost::intrusive_ptr<Expression>> partitionExpr) - : _expCtx(expCtx), - _source(source), - _partitionExpr(partitionExpr), - _state(IteratorState::kNotInitialized) {} + boost::optional<boost::intrusive_ptr<Expression>> partitionExpr, + const boost::optional<SortPattern>& sortPattern); using SlotId = unsigned int; SlotId newSlot() { @@ -116,7 +120,7 @@ private: boost::optional<Document> operator[](int index); /** - * Resolve any type of WindowBounds to a concrete pair of indices, '[lower, upper]'. + * Resolves any type of WindowBounds to a concrete pair of indices, '[lower, upper]'. * * Both 'lower' and 'upper' are valid offsets, such that '(*this)[lower]' and '(*this)[upper]' * returns a document. If the window contains one document, then 'lower == upper'. If the @@ -131,8 +135,46 @@ private: * * This method is non-const because it may pull documents into memory up to the end of the * window. + * + * 'hint', if specified, should be the last result of getEndpoints() for the same 'bounds'. + */ + boost::optional<std::pair<int, int>> getEndpoints( + const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint); + + /** + * Returns the smallest offset 'i' such that (*this)[i] is in '_cache'. + * + * This value is negative or zero, because the current document is always in '_cache'. */ - boost::optional<std::pair<int, int>> getEndpoints(const WindowBounds& bounds); + auto getMinCachedOffset() const { + return -_currentCacheIndex; + } + + /** + * Returns the largest offset 'i' such that (*this)[i] is in '_cache'. + * + * Note that offsets greater than 'i' might still be in the partition, even though they + * haven't been loaded into '_cache' yet. If you want to know where the partition ends, + * call 'cacheWholePartition' first. + * + * This value is positive or zero, because the current document is always in '_cache'. + */ + auto getMaxCachedOffset() const { + return getMinCachedOffset() + _cache.size() - 1; + } + + /** + * Loads documents into '_cache' until we reach a partition boundary. + */ + void cacheWholePartition() { + // Start from one past the end of the _cache. + int i = getMinCachedOffset() + _cache.size(); + // If we have already loaded everything into '_cache' then this condition will be false + // immediately. + while ((*this)[i]) { + ++i; + } + } /** * Marks the given index as expired for the slot 'id'. This does not necessarily mean that the @@ -181,9 +223,21 @@ private: _state = IteratorState::kIntraPartition; } + // Internal helpers for 'getEndpoints()'. + boost::optional<std::pair<int, int>> getEndpointsRangeBased( + const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint); + boost::optional<std::pair<int, int>> getEndpointsDocumentBased( + const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint); + ExpressionContext* _expCtx; DocumentSource* _source; boost::optional<boost::intrusive_ptr<Expression>> _partitionExpr; + + // '_sortExpr' tells us which field is the "time" field. When the user writes + // 'sortBy: {ts: 1}', any time-based or range-based window bounds are defined using + // the value of the "$ts" field. This _sortExpr is used in getEndpoints(). + boost::optional<boost::intrusive_ptr<ExpressionFieldPath>> _sortExpr; + std::deque<Document> _cache; // '_cache[_currentCacheIndex]' is the current document, which '(*this)[0]' returns. int _currentCacheIndex = 0; @@ -237,9 +291,12 @@ public: // This policy assumes that when the caller accesses a certain index 'i', that it will no // longer require all documents up to and including the document at index 'i'. kDefaultSequential, - // This policy should be used if the caller requires the endpoints of a window, potentially - // accessing the same bound twice. - kEndpointsOnly + // This policy should be used if the caller requires the endpoints of a window. Documents + // to the left of the left endpoint may disappear on the next call to releaseExpired(). + kEndpoints, + // This policy means the caller only looks at how the right endpoint changes. + // The caller may look at documents between the most recent two right endpoints. + kRightEndpoint, }; PartitionAccessor(PartitionIterator* iter, Policy policy) : _iter(iter), _slot(iter->newSlot()), _policy(policy) {} @@ -250,7 +307,8 @@ public: case Policy::kDefaultSequential: _iter->expireUpTo(_slot, index); break; - case Policy::kEndpointsOnly: + case Policy::kEndpoints: + case Policy::kRightEndpoint: break; } return ret; @@ -260,12 +318,32 @@ public: return _iter->getCurrentPartitionIndex(); } - boost::optional<std::pair<int, int>> getEndpoints(const WindowBounds& bounds) { - tassert(5371201, "Invalid usage of partition accessor", _policy == Policy::kEndpointsOnly); - auto endpoints = _iter->getEndpoints(bounds); - // With this policy, all documents before the lower bound can be marked as expired. - if (endpoints) { - _iter->expireUpTo(_slot, endpoints->first - 1); + boost::optional<std::pair<int, int>> getEndpoints( + const WindowBounds& bounds, + const boost::optional<std::pair<int, int>>& hint = boost::none) { + auto endpoints = _iter->getEndpoints(bounds, hint); + switch (_policy) { + case Policy::kDefaultSequential: + tasserted(5371201, "Invalid usage of partition accessor"); + break; + case Policy::kEndpoints: + // With this policy, all documents before the lower bound can be marked as expired. + // They will only be released on the next call to releaseExpired(), so when + // getEndpoints() returns, the caller may also look at documents from the previous + // result of getEndpoints(), until it returns control to the DocumentSource. + if (endpoints) { + _iter->expireUpTo(_slot, endpoints->first - 1); + } + break; + case Policy::kRightEndpoint: + // With this policy, all documents before the upper bound can be marked as expired. + // They will only be released on the next call to releaseExpired(), so when + // getEndpoints() returns, the caller may also look at documents from the previous + // result of getEndpoints(), until it returns control to the DocumentSource. + if (endpoints) { + _iter->expireUpTo(_slot, endpoints->second - 1); + } + break; } return endpoints; } diff --git a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp index 068f7912676..81b34e93910 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp +++ b/src/mongo/db/pipeline/window_function/partition_iterator_test.cpp @@ -47,7 +47,8 @@ public: auto makeDefaultAccessor( boost::intrusive_ptr<DocumentSourceMock> mock, boost::optional<boost::intrusive_ptr<Expression>> partExpr = boost::none) { - _iter = std::make_unique<PartitionIterator>(getExpCtx().get(), mock.get(), partExpr); + _iter = std::make_unique<PartitionIterator>( + getExpCtx().get(), mock.get(), partExpr, boost::none); return PartitionAccessor(_iter.get(), PartitionAccessor::Policy::kDefaultSequential); } @@ -255,7 +256,6 @@ TEST_F(PartitionIteratorTest, CurrentOffsetIsCorrectAfterDocumentsAreAccessed) { getExpCtx().get(), "a", getExpCtx()->variablesParseState); auto partIter = makeDefaultAccessor(mock, boost::optional<boost::intrusive_ptr<Expression>>(key)); - ASSERT_EQ(0, partIter.getCurrentPartitionIndex()); auto doc = partIter[0]; advance(); ASSERT_EQ(1, partIter.getCurrentPartitionIndex()); @@ -273,7 +273,7 @@ TEST_F(PartitionIteratorTest, OutsideOfPartitionAccessShouldNotTassert) { const auto docs = std::deque<DocumentSource::GetNextResult>{Document{{"a", 1}}, Document{{"a", 2}}}; const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); - auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential); // Test that an accessor that attempts to read off the end of the partition returns boost::none @@ -288,7 +288,7 @@ DEATH_TEST_F(PartitionIteratorTest, const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}}; const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); - auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential); // Access the first document, which marks it as expired. ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *accessor[0]); @@ -304,7 +304,7 @@ DEATH_TEST_F(PartitionIteratorTest, const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}}; const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); - auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); auto laggingAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential); auto leadingAccessor = @@ -335,8 +335,8 @@ DEATH_TEST_F(PartitionIteratorTest, const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}}; const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); - auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none); - auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); + auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints); // Mock a window with documents [1, 2]. auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(1 << 2)), SortPattern(BSON("a" << 1), getExpCtx()), @@ -365,15 +365,13 @@ DEATH_TEST_F(PartitionIteratorTest, const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}}; const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); - auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); // Create two endpoint accessors, one at [-1, 0] and another at [0, 1]. Since the first one may // access the document at (current - 1), the only expiration that can happen on advance() would // be (newCurrent - 2). - auto lookBehindAccessor = - PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly); - auto lookAheadAccessor = - PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly); + auto lookBehindAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints); + auto lookAheadAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints); auto negBounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(-1 << 0)), SortPattern(BSON("a" << 1), getExpCtx()), getExpCtx().get()); @@ -413,12 +411,48 @@ DEATH_TEST_F(PartitionIteratorTest, ASSERT_THROWS_CODE(lookBehindAccessor[-3], AssertionException, 5371202); } +DEATH_TEST_F(PartitionIteratorTest, + SingleConsumerRightEndpointPolicy, + "Invalid access of expired document") { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}}; + const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); + auto accessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kRightEndpoint); + // Use a window of 'documents: [-2, -1]'. + auto bounds = WindowBounds::parse(BSON("documents" << BSON_ARRAY(-2 << -1)), + SortPattern(BSON("a" << 1), getExpCtx()), + getExpCtx().get()); + + // Advance until {a: 3} is the current document. + partIter.advance(); + partIter.advance(); + ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *accessor[0]); + + // Retrieving the endpoints triggers the expiration: everything below the right endpoint + // is marked as no longer needed. + auto endpoints = accessor.getEndpoints(bounds); + // The endpoints are {a: 1} and {a: 2}. So we will expect {a: 1} to be released. + ASSERT(endpoints != boost::none); + ASSERT_DOCUMENT_EQ(docs[0].getDocument(), *accessor[endpoints->first]); + ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *accessor[endpoints->second]); + + // The no-longer-needed documents are released on the next advance(). + partIter.advance(); + + // The current document is now {a: 4}, and {a: 1} has been released. + ASSERT_DOCUMENT_EQ(docs[3].getDocument(), *accessor[0]); + ASSERT_DOCUMENT_EQ(docs[2].getDocument(), *accessor[-1]); + ASSERT_DOCUMENT_EQ(docs[1].getDocument(), *accessor[-2]); + ASSERT_THROWS_CODE(accessor[-3], AssertionException, 5371202); +} + DEATH_TEST_F(PartitionIteratorTest, MixedPolicy, "Invalid access of expired document") { const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}, Document{{"a", 4}}}; const auto mock = DocumentSourceMock::createForTest(docs, getExpCtx()); - auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none); - auto endpointAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpointsOnly); + auto partIter = PartitionIterator(getExpCtx().get(), mock.get(), boost::none, boost::none); + auto endpointAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kEndpoints); auto defaultAccessor = PartitionAccessor(&partIter, PartitionAccessor::Policy::kDefaultSequential); // Mock a window with documents [1, 2]. diff --git a/src/mongo/db/pipeline/window_function/window_bounds.cpp b/src/mongo/db/pipeline/window_function/window_bounds.cpp index 20be8c30856..babb0394673 100644 --- a/src/mongo/db/pipeline/window_function/window_bounds.cpp +++ b/src/mongo/db/pipeline/window_function/window_bounds.cpp @@ -213,7 +213,7 @@ WindowBounds WindowBounds::parse(BSONObj args, } uassert(5339902, "Range-based bounds require sortBy a single field", - bounds.isUnbounded() || (sortBy && sortBy->size() == 1)); + sortBy && sortBy->size() == 1); return bounds; } } diff --git a/src/mongo/db/pipeline/window_function/window_bounds.h b/src/mongo/db/pipeline/window_function/window_bounds.h index f7b38a2e787..523ac095975 100644 --- a/src/mongo/db/pipeline/window_function/window_bounds.h +++ b/src/mongo/db/pipeline/window_function/window_bounds.h @@ -94,7 +94,7 @@ struct WindowBounds { } /** - * Check if these bounds are unbounded on both ends. + * Checks whether these bounds are unbounded on both ends. * This case is special because it means you don't need a sortBy to interpret the bounds: * the bounds include every document (in the current partition). */ diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.cpp b/src/mongo/db/pipeline/window_function/window_function_exec.cpp index 6bec87bf8b8..ab154b720d4 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec.cpp @@ -30,7 +30,9 @@ #include "mongo/db/pipeline/window_function/window_function_exec.h" #include "mongo/db/pipeline/window_function/window_function_exec_derivative.h" #include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h" +#include "mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h" #include "mongo/db/pipeline/window_function/window_function_exec_removable_document.h" +#include "mongo/db/pipeline/window_function/window_function_exec_removable_range.h" namespace mongo { @@ -75,6 +77,7 @@ std::unique_ptr<WindowFunctionExec> translateDerivative( } // namespace std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( + ExpressionContext* expCtx, PartitionIterator* iter, const WindowFunctionStatement& functionStmt, const boost::optional<SortPattern>& sortBy) { @@ -84,22 +87,46 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( return translateDerivative(iter, *deriv, sortBy); } - // Use a sentinel variable to avoid a compilation error when some cases of std::visit don't - // return a value. - std::unique_ptr<WindowFunctionExec> exec; - stdx::visit( + WindowBounds bounds = functionStmt.expr->bounds(); + + return stdx::visit( visit_helper::Overloaded{ - [&](const WindowBounds::DocumentBased& docBase) { - exec = translateDocumentWindow(iter, functionStmt.expr, docBase); + [&](const WindowBounds::DocumentBased& docBounds) { + return translateDocumentWindow(iter, functionStmt.expr, docBounds); }, - [&](const WindowBounds::RangeBased& rangeBase) { - uasserted(5397901, "Ranged based windows not currently supported"); + [&](const WindowBounds::RangeBased& rangeBounds) + -> std::unique_ptr<WindowFunctionExec> { + // These checks should be enforced already during parsing. + tassert(5429401, + "Range-based window needs a non-compound sortBy", + sortBy != boost::none && sortBy->size() == 1); + SortPattern::SortPatternPart part = *sortBy->begin(); + tassert(5429410, + "Range-based window doesn't work on expression-sortBy", + part.fieldPath != boost::none && !part.expression); + auto sortByExpr = ExpressionFieldPath::createPathFromString( + expCtx, part.fieldPath->fullPath(), expCtx->variablesParseState); + + if (stdx::holds_alternative<WindowBounds::Unbounded>(rangeBounds.lower)) { + return std::make_unique<WindowFunctionExecNonRemovableRange>( + iter, + functionStmt.expr->input(), + std::move(sortByExpr), + functionStmt.expr->buildAccumulatorOnly(), + bounds); + } else { + return std::make_unique<WindowFunctionExecRemovableRange>( + iter, + functionStmt.expr->input(), + std::move(sortByExpr), + functionStmt.expr->buildRemovable(), + bounds); + } }, - [&](const WindowBounds::TimeBased& timeBase) { + [&](const WindowBounds::TimeBased& timeBounds) -> std::unique_ptr<WindowFunctionExec> { uasserted(5397902, "Time based windows are not currently supported"); }}, - functionStmt.expr->bounds().bounds); - return exec; + bounds.bounds); } } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.h b/src/mongo/db/pipeline/window_function/window_function_exec.h index b46de96d36f..e98563f8eca 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec.h @@ -56,7 +56,8 @@ public: * Creates an appropriate WindowFunctionExec that is capable of evaluating the window function * over the given bounds, both found within the WindowFunctionStatement. */ - static std::unique_ptr<WindowFunctionExec> create(PartitionIterator* iter, + static std::unique_ptr<WindowFunctionExec> create(ExpressionContext* expCtx, + PartitionIterator* iter, const WindowFunctionStatement& functionStmt, const boost::optional<SortPattern>& sortBy); @@ -91,22 +92,8 @@ protected: */ class WindowFunctionExecRemovable : public WindowFunctionExec { public: - WindowFunctionExecRemovable(PartitionIterator* iter, - boost::intrusive_ptr<Expression> input, - std::unique_ptr<WindowFunctionState> function) - : WindowFunctionExec( - PartitionAccessor(iter, PartitionAccessor::Policy::kDefaultSequential)), - _input(std::move(input)), - _function(std::move(function)) {} - - Value getNext() { - if (!_initialized) { - this->initialize(); - _initialized = true; - return _function->getValue(); - } - processDocumentsToUpperBound(); - removeDocumentsUnderLowerBound(); + Value getNext() override { + update(); return _function->getValue(); } @@ -118,15 +105,14 @@ public: return _function->getApproximateSize() + _memUsageBytes; } - protected: - boost::intrusive_ptr<Expression> _input; - std::unique_ptr<WindowFunctionState> _function; - // Keep track of values in the window function that will need to be removed later. - std::queue<Value> _values; - // In one of two states: either the initial window has not been populated or we are sliding and - // accumulating/removing values. - bool _initialized = false; + WindowFunctionExecRemovable(PartitionIterator* iter, + PartitionAccessor::Policy policy, + boost::intrusive_ptr<Expression> input, + std::unique_ptr<WindowFunctionState> function) + : WindowFunctionExec(PartitionAccessor(iter, policy)), + _input(std::move(input)), + _function(std::move(function)) {} void addValue(Value v) { _function->add(v); @@ -134,14 +120,31 @@ protected: _memUsageBytes += v.getApproximateSize(); } + void removeValue() { + tassert(5429400, "Tried to remove more values than we added", !_values.empty()); + auto v = _values.front(); + _function->remove(v); + _values.pop(); + _memUsageBytes -= v.getApproximateSize(); + } + + boost::intrusive_ptr<Expression> _input; + std::unique_ptr<WindowFunctionState> _function; + // Keep track of values in the window function that will need to be removed later. + std::queue<Value> _values; + // Track the byte size of the values being stored by this class. Does not include the constant // size objects being held or the overhead of the data structures. size_t _memUsageBytes = 0; private: - virtual void processDocumentsToUpperBound() = 0; - virtual void removeDocumentsUnderLowerBound() = 0; - virtual void initialize() = 0; + /** + * This method notifies the executor that the underlying PartitionIterator + * '_iter' has been advanced one time since the last call to initialize() or + * update(). It should determine how the window has changed (which documents have + * entered it? which have left it?) and call addValue(), removeValue() as needed. + */ + virtual void update() = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h index 2b02ed0db14..1bac1417b99 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative.h @@ -57,7 +57,7 @@ public: boost::intrusive_ptr<Expression> time, WindowBounds bounds, boost::optional<TimeUnit> outputUnit) - : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpointsOnly)), + : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints)), _position(std::move(position)), _time(std::move(time)), _bounds(std::move(bounds)), diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp index 548dda10a46..da22a7762c3 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_derivative_test.cpp @@ -52,8 +52,8 @@ public: WindowBounds bounds, boost::optional<TimeUnit> timeUnit = boost::none) { _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); - _iter = - std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none); + _iter = std::make_unique<PartitionIterator>( + getExpCtx().get(), _docSource.get(), boost::none, boost::none); auto position = ExpressionFieldPath::parse( getExpCtx().get(), positionPath, getExpCtx()->variablesParseState); diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h b/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h index ef4c4654fd7..e5ab3b4065c 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_first_last.h @@ -48,7 +48,7 @@ protected: boost::intrusive_ptr<Expression> input, WindowBounds bounds, boost::optional<boost::intrusive_ptr<Expression>> defaultValue) - : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpointsOnly)), + : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kEndpoints)), _input(std::move(input)), _bounds(std::move(bounds)) { if (!defaultValue) { diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp index 3f86e395522..f92f32b24db 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_first_last_test.cpp @@ -57,7 +57,7 @@ public: auto vps = expCtx->variablesParseState; auto optKey = keyPath ? optExp(ExpressionFieldPath::parse(expCtx, *keyPath, vps)) : boost::none; - _iter = std::make_unique<PartitionIterator>(expCtx, _docSource.get(), optKey); + _iter = std::make_unique<PartitionIterator>(expCtx, _docSource.get(), optKey, boost::none); auto inputField = ExpressionFieldPath::parse(expCtx, "$val", vps); auto defaultExp = defaultValue ? optExp(ExpressionConstant::parse(expCtx, *defaultValue, vps)) diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h new file mode 100644 index 00000000000..104e5af4d0b --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_range.h @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/partition_iterator.h" +#include "mongo/db/pipeline/window_function/window_function_exec_non_removable.h" + +namespace mongo { + +/** + * An executor that handles left-unbounded, range-based windows. + */ +class WindowFunctionExecNonRemovableRange final : public WindowFunctionExec { +public: + WindowFunctionExecNonRemovableRange(PartitionIterator* iter, + boost::intrusive_ptr<Expression> input, + boost::intrusive_ptr<ExpressionFieldPath> sortExpr, + boost::intrusive_ptr<AccumulatorState> function, + WindowBounds bounds) + : WindowFunctionExec(PartitionAccessor(iter, PartitionAccessor::Policy::kRightEndpoint)), + _input(std::move(input)), + _sortExpr(std::move(sortExpr)), + _function(std::move(function)), + _bounds(bounds) {} + + Value getNext() final { + update(); + return _function->getValue(false); + } + + size_t getApproximateSize() const final { + return _function->getMemUsage(); + } + + void reset() final { + _function->reset(); + _lastEndpoints = boost::none; + } + +private: + void update() { + auto endpoints = _iter.getEndpoints(_bounds, _lastEndpoints); + // There are 4 different transitions we can make: + if (_lastEndpoints) { + if (endpoints) { + // Transition from nonempty to nonempty: add new values based on how the window + // changed. + for (int i = _lastEndpoints->second + 1; i <= endpoints->second; ++i) { + addValueAt(i); + } + } else { + // Transition from nonempty to empty: discard the accumulator state. + _function->reset(); + } + } else { + if (endpoints) { + // Transition from empty to nonempty: add the new values. + for (int i = endpoints->first; i <= endpoints->second; ++i) { + addValueAt(i); + } + } else { + // Transition from empty to empty: nothing to do! + } + } + + if (endpoints) { + // Shift endpoints by 1 because we will have advanced by 1 document on the next call + // to update(). + _lastEndpoints = std::pair(endpoints->first - 1, endpoints->second - 1); + } else { + _lastEndpoints = boost::none; + } + } + void addValueAt(int offset) { + auto doc = _iter[offset]; + tassert(5429411, "endpoints must fall in the partition", doc); + Value v = _input->evaluate(*doc, &_input->getExpressionContext()->variables); + _function->process(v, false); + } + + boost::intrusive_ptr<Expression> _input; + boost::intrusive_ptr<ExpressionFieldPath> _sortExpr; + boost::intrusive_ptr<AccumulatorState> _function; + WindowBounds _bounds; + + boost::optional<std::pair<int, int>> _lastEndpoints; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp index efa7c88cccd..d8042421f95 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp @@ -55,8 +55,8 @@ public: const std::string& inputPath, WindowBounds::Bound<int> upper) { _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); - _iter = - std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none); + _iter = std::make_unique<PartitionIterator>( + getExpCtx().get(), _docSource.get(), boost::none, boost::none); auto input = ExpressionFieldPath::parse( getExpCtx().get(), inputPath, getExpCtx()->variablesParseState); return WindowFunctionExecNonRemovable<AccumulatorState>( @@ -123,8 +123,10 @@ TEST_F(WindowFunctionExecNonRemovableTest, AccumulateOnlyWithMultiplePartitions) auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); auto key = ExpressionFieldPath::createPathFromString( getExpCtx().get(), "key", getExpCtx()->variablesParseState); - auto iter = PartitionIterator( - getExpCtx().get(), mock.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)); + auto iter = PartitionIterator(getExpCtx().get(), + mock.get(), + boost::optional<boost::intrusive_ptr<Expression>>(key), + boost::none); auto input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); auto mgr = WindowFunctionExecNonRemovable<AccumulatorState>( @@ -155,8 +157,8 @@ TEST_F(WindowFunctionExecNonRemovableTest, InputExpressionAllowedToCreateVariabl const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}}; auto docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); - auto iter = - std::make_unique<PartitionIterator>(getExpCtx().get(), docSource.get(), boost::none); + auto iter = std::make_unique<PartitionIterator>( + getExpCtx().get(), docSource.get(), boost::none, boost::none); auto filterBSON = fromjson("{$filter: {input: [1, 2, 3], as: 'num', cond: {$gte: ['$$num', 2]}}}"); auto input = ExpressionFilter::parse( diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp index a459d295a8e..1d3ef26485f 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp @@ -38,7 +38,10 @@ WindowFunctionExecRemovableDocument::WindowFunctionExecRemovableDocument( boost::intrusive_ptr<Expression> input, std::unique_ptr<WindowFunctionState> function, WindowBounds::DocumentBased bounds) - : WindowFunctionExecRemovable(iter, std::move(input), std::move(function)) { + : WindowFunctionExecRemovable(iter, + PartitionAccessor::Policy::kDefaultSequential, + std::move(input), + std::move(function)) { stdx::visit( visit_helper::Overloaded{ [](const WindowBounds::Unbounded&) { @@ -73,9 +76,15 @@ void WindowFunctionExecRemovableDocument::initialize() { break; } } + _initialized = true; } -void WindowFunctionExecRemovableDocument::processDocumentsToUpperBound() { +void WindowFunctionExecRemovableDocument::update() { + if (!_initialized) { + initialize(); + return; + } + // If there is no upper bound, the whole partition is loaded by initialize. if (_upperBound) { // If this is false, we're over the end of the partition. @@ -83,9 +92,7 @@ void WindowFunctionExecRemovableDocument::processDocumentsToUpperBound() { addValue(_input->evaluate(*doc, &_input->getExpressionContext()->variables)); } } -} -void WindowFunctionExecRemovableDocument::removeDocumentsUnderLowerBound() { // For a positive lower bound the first pass loads the correct window, so subsequent passes // must always remove a document if there is a document left to remove. // For a negative lower bound we can start removing every time only after we have seen diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h index dc8327368ab..55a1ed4a76a 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h @@ -61,11 +61,8 @@ public: } private: - void processDocumentsToUpperBound() final; - - void removeDocumentsUnderLowerBound() final; - - void initialize() final; + void update() final; + void initialize(); void removeFirstValueIfExists() { if (_values.size() == 0) { @@ -76,6 +73,10 @@ private: _values.pop(); } + // In one of two states: either the initial window has not been populated or we are sliding and + // accumulating/removing values. + bool _initialized = false; + int _lowerBound; // Will stay boost::none if right unbounded. boost::optional<int> _upperBound = boost::none; diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp new file mode 100644 index 00000000000..5783e2da1ec --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.cpp @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/window_function/window_function_exec_removable_range.h" + +using boost::optional; +using std::pair; + +namespace mongo { + +WindowFunctionExecRemovableRange::WindowFunctionExecRemovableRange( + PartitionIterator* iter, + boost::intrusive_ptr<Expression> input, + boost::intrusive_ptr<ExpressionFieldPath> sortBy, + std::unique_ptr<WindowFunctionState> function, + WindowBounds bounds) + : WindowFunctionExecRemovable( + iter, PartitionAccessor::Policy::kEndpoints, std::move(input), std::move(function)), + _sortBy(std::move(sortBy)), + _bounds(std::move(bounds)) {} + +namespace { +struct EndpointsChange { + optional<pair<int, int>> added; + optional<pair<int, int>> removed; +}; + +/** + * Diffs two intervals: the result is expressed as two new intervals, for the added and removed + * elements. The intervals are all represented as inclusive [lower, upper], so an empty interval + * is represented as boost::none. + * + * For example, in 'diff([2, 10], [5, 14])' the lower bound changed from 2 to 5, so 'removed' is + * [2, 4], and the upper bound changed from 10 to 14, so 'added' is [11, 14]. + */ +EndpointsChange diffEndpoints(optional<pair<int, int>> old, optional<pair<int, int>> current) { + EndpointsChange result; + + if (!old && !current) { + return result; + } + if (!old) { + result.added = current; + return result; + } + if (!current) { + result.removed = old; + return result; + } + + auto [oldLower, oldUpper] = *old; + auto [lower, upper] = *current; + tassert(5429407, "Endpoints should never decrease.", oldLower <= lower && oldUpper <= upper); + if (oldLower < lower) { + result.removed = std::pair(oldLower, lower - 1); + } + if (oldUpper < upper) { + result.added = std::pair(oldUpper + 1, upper); + } + return result; +} +} // namespace + +void WindowFunctionExecRemovableRange::update() { + // Calling getEndpoints here also informs the PartitionAccessor that we won't need documents + // to the left of endpoints->first. However, we need to access those documents here in + // update(), to remove them from the WindowFunctionState. This is ok, because the documents + // expire later, on the next call to releaseExpired(). We can still use the documents between + // _lastEndpoints during this update(). + auto endpoints = _iter.getEndpoints(_bounds, _lastEndpoints); + auto [added, removed] = diffEndpoints(_lastEndpoints, endpoints); + + if (added) { + auto [lower, upper] = *added; + for (auto i = lower; i <= upper; ++i) { + addValue(_input->evaluate(*_iter[i], nullptr)); + } + } + if (removed) { + auto [lower, upper] = *removed; + for (auto i = lower; i <= upper; ++i) { + removeValue(); + } + } + + // Update _lastEndpoints. + if (endpoints) { + auto [lower, upper] = *endpoints; + // On the next call to update(), we will have advanced by 1 document. + // The document we call '0' now, will be called '-1' on that next update(). + _lastEndpoints = std::pair(lower - 1, upper - 1); + } else { + _lastEndpoints = boost::none; + } +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h new file mode 100644 index 00000000000..b58416fac7f --- /dev/null +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_range.h @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/partition_iterator.h" +#include "mongo/db/pipeline/window_function/window_bounds.h" +#include "mongo/db/pipeline/window_function/window_function_exec.h" + +namespace mongo { + +/** + * An executor that handles left-bounded, range-based windows. + * + * Left-bounded windows require a window-function state that supports removing elements. + */ +class WindowFunctionExecRemovableRange final : public WindowFunctionExecRemovable { +public: + /** + * Constructs a removable window function executor with the given input expression to be + * evaluated and passed to the corresponding WindowFunc for each document in the window. + * + * For example, in + * {$setWindowFields: { + * sortBy: {ts: 1}, + * output: { + * v: {$max: "$x", window: {range: [-5, 'unbounded']}} + * } + * }} + * + * 'input' is "$x" + * 'sortBy' is "$ts" (translated from the sort spec, {ts: 1}) + * 'function' is a WindowFunctionMax + * 'bounds' is WindowBounds::RangeBased{Value{-5}, WindowBounds::Unbounded{}} + */ + WindowFunctionExecRemovableRange(PartitionIterator* iter, + boost::intrusive_ptr<Expression> input, + boost::intrusive_ptr<ExpressionFieldPath> sortBy, + std::unique_ptr<WindowFunctionState> function, + WindowBounds bounds); + + Value getNext() override { + update(); + return _function->getValue(); + } + + void reset() final { + _function->reset(); + _lastEndpoints = boost::none; + _memUsageBytes = 0; + } + +private: + void update() final; + + boost::intrusive_ptr<ExpressionFieldPath> _sortBy; + WindowBounds _bounds; + boost::optional<std::pair<int, int>> _lastEndpoints; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp index 7cf9746f00c..cf8deb2c3bd 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp @@ -54,8 +54,8 @@ public: const std::string& inputPath, WindowBounds::DocumentBased bounds) { _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); - _iter = - std::make_unique<PartitionIterator>(getExpCtx().get(), _docSource.get(), boost::none); + _iter = std::make_unique<PartitionIterator>( + getExpCtx().get(), _docSource.get(), boost::none, boost::none); auto input = ExpressionFieldPath::parse( getExpCtx().get(), inputPath, getExpCtx()->variablesParseState); std::unique_ptr<WindowFunctionState> maxFunc = @@ -256,8 +256,10 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) { auto mock = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); auto key = ExpressionFieldPath::createPathFromString( getExpCtx().get(), "key", getExpCtx()->variablesParseState); - auto iter = PartitionIterator{ - getExpCtx().get(), mock.get(), boost::optional<boost::intrusive_ptr<Expression>>(key)}; + auto iter = PartitionIterator{getExpCtx().get(), + mock.get(), + boost::optional<boost::intrusive_ptr<Expression>>(key), + boost::none}; auto input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); CollatorInterfaceMock collator = CollatorInterfaceMock::MockType::kToLowerString; @@ -289,7 +291,8 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, CanResetFunction) { getExpCtx().get(), "key", getExpCtx()->variablesParseState); auto iter = PartitionIterator{getExpCtx().get(), mockTwo.get(), - boost::optional<boost::intrusive_ptr<Expression>>(keyTwo)}; + boost::optional<boost::intrusive_ptr<Expression>>(keyTwo), + boost::none}; auto input = ExpressionFieldPath::parse(getExpCtx().get(), "$a", getExpCtx()->variablesParseState); auto maxFunc = std::make_unique<WindowFunctionMax>(getExpCtx().get()); @@ -310,8 +313,8 @@ TEST_F(WindowFunctionExecRemovableDocumentTest, InputExpressionAllowedToCreateVa const auto docs = std::deque<DocumentSource::GetNextResult>{ Document{{"a", 1}}, Document{{"a", 2}}, Document{{"a", 3}}}; auto docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); - auto iter = - std::make_unique<PartitionIterator>(getExpCtx().get(), docSource.get(), boost::none); + auto iter = std::make_unique<PartitionIterator>( + getExpCtx().get(), docSource.get(), boost::none, boost::none); auto filterBSON = fromjson("{$filter: {input: [1, 2, 3], as: 'num', cond: {$gte: ['$$num', 2]}}}"); auto input = ExpressionFilter::parse( |