diff options
author | David Percy <david.percy@mongodb.com> | 2021-04-05 17:16:16 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-21 23:00:01 +0000 |
commit | 78c15ec274d0a4162aa5fcb29b367a8e1c0809d3 (patch) | |
tree | 1b3b0acb56b1f24e683b3daee0fbaf5e5e2c4605 /src/mongo | |
parent | 2f0aae1ce89a3493fec0bedfbed4788f070105c2 (diff) | |
download | mongo-78c15ec274d0a4162aa5fcb29b367a8e1c0809d3.tar.gz |
SERVER-54295 Support time-based window bounds in $setWindowFields
Diffstat (limited to 'src/mongo')
5 files changed, 67 insertions, 47 deletions
diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.cpp b/src/mongo/db/pipeline/window_function/partition_iterator.cpp index 32967618454..59d82a10636 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.cpp +++ b/src/mongo/db/pipeline/window_function/partition_iterator.cpp @@ -223,27 +223,51 @@ Value decimalAdd(const Value& left, const Value& right) { // user can't observe the type. return Value(left.coerceToDecimal().add(right.coerceToDecimal())); } + + } // namespace optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased( - const WindowBounds& bounds, const optional<std::pair<int, int>>& hint) { + const WindowBounds::RangeBased& range, const optional<std::pair<int, int>>& hint) { + tassert(5429404, "Missing _sortExpr with range-based bounds", _sortExpr != boost::none); - // TODO SERVER-54295: time-based bounds - uassert(5429402, - "Time-based bounds not supported yet", - stdx::holds_alternative<WindowBounds::RangeBased>(bounds.bounds)); - auto range = stdx::get<WindowBounds::RangeBased>(bounds.bounds); auto lessThan = _expCtx->getValueComparator().getLessThan(); Value base = (*_sortExpr)->evaluate(*(*this)[0], &_expCtx->variables); - uassert(5429413, + if (range.unit) { + uassert( + 5429513, + str::stream() << "Invalid range: Expected the sortBy field to be a Date, but it was " + << base.getType(), + base.getType() == BSONType::Date); + } else { + uassert( + 5429413, "Invalid range: For windows that involve date or time ranges, a unit must be provided.", base.getType() != BSONType::Date); - uassert(5429414, + uassert( + 5429414, str::stream() << "Invalid range: Expected the sortBy field to be a number, but it was " << base.getType(), base.numeric()); + } + auto add = [&](const Value& base, const Value& delta) -> Value { + if (range.unit) { + return Value{ + dateAdd(base.coerceToDate(), *range.unit, delta.coerceToInt(), TimeZone())}; + } else { + tassert(5429406, "Range-based bounds are specified as a number", delta.numeric()); + return decimalAdd(base, delta); + } + }; + auto hasExpectedType = [&](const Value& v) -> bool { + if (range.unit) { + return v.getType() == BSONType::Date; + } else { + return v.numeric(); + } + }; // 'lower' is the smallest offset in the partition that's within the lower bound of the window. optional<int> lower = stdx::visit( @@ -269,13 +293,13 @@ optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased( return boost::none; } Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables); - if (v.numeric()) { + if (hasExpectedType(v)) { return i; } } }, [&](const Value& delta) -> optional<int> { - Value threshold = decimalAdd(base, delta); + Value threshold = add(base, delta); // Start from the beginning, or the hint, whichever is higher. // Note that the hint may no longer be a valid offset, if some documents were @@ -324,7 +348,7 @@ optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased( boost::optional<Document> doc; for (int i = start; (doc = (*this)[i]); ++i) { Value v = (*_sortExpr)->evaluate(*doc, &_expCtx->variables); - if (!v.numeric()) { + if (!hasExpectedType(v)) { // The previously scanned doc is the rightmost numeric one. Since we start // from '0', 'hint', or 'lower', which are all numeric, we should never hit // this case on the first iteration. @@ -339,8 +363,7 @@ optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased( }, [&](const Value& delta) -> optional<int> { // Pull in documents until the sortBy value crosses 'base + delta'. - tassert(5429406, "Range-based bounds are specified as a number", delta.numeric()); - Value threshold = decimalAdd(base, delta); + Value threshold = add(base, delta); // If there's no hint, start scanning from the lower bound. // If there is a hint, start from whichever is greater: lower bound or hint. @@ -385,9 +408,8 @@ optional<std::pair<int, int>> PartitionIterator::getEndpointsRangeBased( } optional<std::pair<int, int>> PartitionIterator::getEndpointsDocumentBased( - const WindowBounds& bounds, const optional<std::pair<int, int>>& hint = boost::none) { - tassert(5423301, "getEndpoints assumes there is a current document", (*this)[0] != boost::none); - auto docBounds = stdx::get<WindowBounds::DocumentBased>(bounds.bounds); + const WindowBounds::DocumentBased& docBounds, + const optional<std::pair<int, int>>& hint = boost::none) { optional<int> lowerBound = numericBound(docBounds.lower); optional<int> upperBound = numericBound(docBounds.upper); tassert(5423302, @@ -430,10 +452,19 @@ optional<std::pair<int, int>> PartitionIterator::getEndpointsDocumentBased( optional<std::pair<int, int>> PartitionIterator::getEndpoints( const WindowBounds& bounds, const optional<std::pair<int, int>>& hint = boost::none) { - if (!stdx::holds_alternative<WindowBounds::DocumentBased>(bounds.bounds)) { - return getEndpointsRangeBased(bounds, hint); - } - return getEndpointsDocumentBased(bounds, hint); + + tassert(5423301, "getEndpoints assumes there is a current document", (*this)[0] != boost::none); + + return stdx::visit( + visit_helper::Overloaded{ + [&](const WindowBounds::DocumentBased docBounds) { + return getEndpointsDocumentBased(docBounds, hint); + }, + [&](const WindowBounds::RangeBased rangeBounds) { + return getEndpointsRangeBased(rangeBounds, hint); + }, + }, + bounds.bounds); } void PartitionIterator::getNextDocument() { diff --git a/src/mongo/db/pipeline/window_function/partition_iterator.h b/src/mongo/db/pipeline/window_function/partition_iterator.h index 1b199685238..4e14458c4b5 100644 --- a/src/mongo/db/pipeline/window_function/partition_iterator.h +++ b/src/mongo/db/pipeline/window_function/partition_iterator.h @@ -225,9 +225,10 @@ private: // Internal helpers for 'getEndpoints()'. boost::optional<std::pair<int, int>> getEndpointsRangeBased( - const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint); + const WindowBounds::RangeBased& bounds, const boost::optional<std::pair<int, int>>& hint); boost::optional<std::pair<int, int>> getEndpointsDocumentBased( - const WindowBounds& bounds, const boost::optional<std::pair<int, int>>& hint); + const WindowBounds::DocumentBased& bounds, + const boost::optional<std::pair<int, int>>& hint); ExpressionContext* _expCtx; DocumentSource* _source; diff --git a/src/mongo/db/pipeline/window_function/window_bounds.cpp b/src/mongo/db/pipeline/window_function/window_bounds.cpp index babb0394673..dd082135e02 100644 --- a/src/mongo/db/pipeline/window_function/window_bounds.cpp +++ b/src/mongo/db/pipeline/window_function/window_bounds.cpp @@ -183,17 +183,18 @@ WindowBounds WindowBounds::parse(BSONObj args, str::stream() << "'" << kArgUnit << "' must be a string", unit.type() == BSONType::String); - auto parseInt = [](Value v) -> int { + auto parseInt = [](Value v) -> Value { uassert(ErrorCodes::FailedToParse, str::stream() << "With '" << kArgUnit << "', range-based bounds must be an integer", v.integral()); - return v.coerceToInt(); + return v; }; - auto lower = parseBound<int>(expCtx, lowerElem, parseInt); - auto upper = parseBound<int>(expCtx, upperElem, parseInt); + // Syntactically, time-based bounds can't be fractional. So parse as int. + auto lower = parseBound<Value>(expCtx, lowerElem, parseInt); + auto upper = parseBound<Value>(expCtx, upperElem, parseInt); checkBoundsForward(lower, upper); - bounds = WindowBounds{TimeBased{lower, upper, parseTimeUnit(unit.str())}}; + bounds = WindowBounds{RangeBased{lower, upper, parseTimeUnit(unit.str())}}; } else { // Parse range-based bounds. uassert(ErrorCodes::FailedToParse, @@ -231,13 +232,9 @@ void WindowBounds::serialize(MutableDocument& args) const { serializeBound(rangeBounds.lower), serializeBound(rangeBounds.upper), }}; - }, - [&](const TimeBased& timeBounds) { - args[kArgRange] = Value{std::vector<Value>{ - serializeBound(timeBounds.lower), - serializeBound(timeBounds.upper), - }}; - args[kArgUnit] = Value{serializeTimeUnit(timeBounds.unit)}; + if (rangeBounds.unit) { + args[kArgUnit] = Value{serializeTimeUnit(*rangeBounds.unit)}; + } }, }, bounds); diff --git a/src/mongo/db/pipeline/window_function/window_bounds.h b/src/mongo/db/pipeline/window_function/window_bounds.h index 523ac095975..ad6a966ecfa 100644 --- a/src/mongo/db/pipeline/window_function/window_bounds.h +++ b/src/mongo/db/pipeline/window_function/window_bounds.h @@ -75,19 +75,12 @@ struct WindowBounds { Bound<int> upper; }; struct RangeBased { - // Range-based bounds can be any numeric type: int, double, Decimal, etc. Bound<Value> lower; Bound<Value> upper; - }; - struct TimeBased { - // Although time-based bounds look similar to range-based, they are more restricted: - // the numbers must be integers, like $dateAdd / $dateDiff. - Bound<int> lower; - Bound<int> upper; - TimeUnit unit; + boost::optional<TimeUnit> unit; }; - stdx::variant<DocumentBased, RangeBased, TimeBased> bounds; + stdx::variant<DocumentBased, RangeBased> bounds; static WindowBounds defaultBounds() { return WindowBounds{DocumentBased{Unbounded{}, Unbounded{}}}; diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.cpp b/src/mongo/db/pipeline/window_function/window_function_exec.cpp index ab154b720d4..0ce32211238 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec.cpp @@ -74,6 +74,7 @@ std::unique_ptr<WindowFunctionExec> translateDerivative( iter, deriv.input(), sortExpr, deriv.bounds(), deriv.outputUnit()); } + } // namespace std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( @@ -106,7 +107,6 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( part.fieldPath != boost::none && !part.expression); auto sortByExpr = ExpressionFieldPath::createPathFromString( expCtx, part.fieldPath->fullPath(), expCtx->variablesParseState); - if (stdx::holds_alternative<WindowBounds::Unbounded>(rangeBounds.lower)) { return std::make_unique<WindowFunctionExecNonRemovableRange>( iter, @@ -123,9 +123,7 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( bounds); } }, - [&](const WindowBounds::TimeBased& timeBounds) -> std::unique_ptr<WindowFunctionExec> { - uasserted(5397902, "Time based windows are not currently supported"); - }}, + }, bounds.bounds); } |