diff options
Diffstat (limited to 'src/mongo')
13 files changed, 354 insertions, 107 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 1333b056f75..1554ab17ab3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -121,6 +121,7 @@ env.Library( 'accumulator_covariance.cpp', 'accumulator_exp_moving_avg.cpp', 'accumulator_first.cpp', + 'accumulator_integral.cpp', 'accumulator_js_reduce.cpp', 'accumulator_last.cpp', 'accumulator_merge_objects.cpp', diff --git a/src/mongo/db/pipeline/accumulator_for_window_functions.h b/src/mongo/db/pipeline/accumulator_for_window_functions.h index e162b36f629..058d45f62f2 100644 --- a/src/mongo/db/pipeline/accumulator_for_window_functions.h +++ b/src/mongo/db/pipeline/accumulator_for_window_functions.h @@ -31,6 +31,7 @@ #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/window_function/window_function_covariance.h" +#include "mongo/db/pipeline/window_function/window_function_integral.h" namespace mongo { @@ -128,4 +129,22 @@ public: static boost::intrusive_ptr<AccumulatorState> create(ExpressionContext* const expCtx); }; +class AccumulatorIntegral : public AccumulatorForWindowFunctions { +public: + explicit AccumulatorIntegral(ExpressionContext* const expCtx, + boost::optional<long long> outputUnitMillis = boost::none); + + void processInternal(const Value& input, bool merging) final; + Value getValue(bool toBeMerged) final; + void reset() final; + + const char* getOpName() const final; + + static boost::intrusive_ptr<AccumulatorState> create( + ExpressionContext* const expCtx, boost::optional<long long> outputUnitMillis = boost::none); + +private: + WindowFunctionIntegral _integralWF; +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/accumulator_integral.cpp b/src/mongo/db/pipeline/accumulator_integral.cpp new file mode 100644 index 00000000000..83ba68c40d1 --- /dev/null +++ b/src/mongo/db/pipeline/accumulator_integral.cpp @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/accumulator_for_window_functions.h" + +#include "mongo/db/exec/document_value/value.h" +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/window_function/window_function_expression.h" + +namespace mongo { + +REGISTER_WINDOW_FUNCTION(integral, mongo::window_function::ExpressionIntegral::parse); + +AccumulatorIntegral::AccumulatorIntegral(ExpressionContext* const expCtx, + boost::optional<long long> outputUnitMillis) + : AccumulatorForWindowFunctions(expCtx), + _integralWF(expCtx, outputUnitMillis, true /* isNonremovable */) { + _memUsageBytes = sizeof(*this); +} + +void AccumulatorIntegral::processInternal(const Value& input, bool merging) { + tassert(5558800, "$integral can't be merged", !merging); + + _integralWF.add(input); + _memUsageBytes = sizeof(*this) + _integralWF.getApproximateSize() - sizeof(_integralWF); +} + +Value AccumulatorIntegral::getValue(bool toBeMerged) { + return _integralWF.getValue(); +} + +void AccumulatorIntegral::reset() { + _integralWF.reset(); + _memUsageBytes = sizeof(*this); +} + +boost::intrusive_ptr<AccumulatorState> AccumulatorIntegral::create( + ExpressionContext* const expCtx, boost::optional<long long> outputUnitMillis) { + return new AccumulatorIntegral(expCtx, outputUnitMillis); +} + +const char* AccumulatorIntegral::getOpName() const { + return "$integral"; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec.cpp b/src/mongo/db/pipeline/window_function/window_function_exec.cpp index 0ce32211238..a3df28b50ea 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec.cpp @@ -38,21 +38,52 @@ namespace mongo { namespace { +/** + * Translates the input Expression to suit certain window function's need. For example, $integral + * window function requires the input value to be a 2-sized Value vector containing the evaluating + * value of 'sortBy' and 'input' field. So we create an 'ExpressionArray' as the input Expression + * for Executors. + * + * Returns the 'input' in 'expr' if no extra translation is needed. + */ +boost::intrusive_ptr<Expression> translateInputExpression( + boost::intrusive_ptr<window_function::Expression> expr, + const boost::optional<SortPattern>& sortBy) { + if (!expr) + return nullptr; + if (auto integral = dynamic_cast<window_function::ExpressionIntegral*>(expr.get())) { + auto expCtx = integral->expCtx(); + tassert(5558802, + "$integral requires a 1-field ascending sortBy", + sortBy && sortBy->size() == 1 && !sortBy->begin()->expression && + sortBy->begin()->isAscending); + auto sortByExpr = ExpressionFieldPath::createPathFromString( + expCtx, sortBy->begin()->fieldPath->fullPath(), expCtx->variablesParseState); + return ExpressionArray::create( + expCtx, std::vector<boost::intrusive_ptr<Expression>>{sortByExpr, integral->input()}); + } + + return expr->input(); +} + std::unique_ptr<WindowFunctionExec> translateDocumentWindow( PartitionIterator* iter, boost::intrusive_ptr<window_function::Expression> expr, + const boost::optional<SortPattern>& sortBy, const WindowBounds::DocumentBased& bounds) { + auto inputExpr = translateInputExpression(expr, sortBy); + return stdx::visit( visit_helper::Overloaded{ [&](const WindowBounds::Unbounded&) -> std::unique_ptr<WindowFunctionExec> { // A left unbounded window will always be non-removable regardless of the upper // bound. return std::make_unique<WindowFunctionExecNonRemovable<AccumulatorState>>( - iter, expr->input(), expr->buildAccumulatorOnly(), bounds.upper); + iter, inputExpr, expr->buildAccumulatorOnly(), bounds.upper); }, [&](const auto&) -> std::unique_ptr<WindowFunctionExec> { return std::make_unique<WindowFunctionExecRemovableDocument>( - iter, expr->input(), expr->buildRemovable(), bounds); + iter, inputExpr, expr->buildRemovable(), bounds); }}, bounds.lower); } @@ -74,7 +105,6 @@ std::unique_ptr<WindowFunctionExec> translateDerivative( iter, deriv.input(), sortExpr, deriv.bounds(), deriv.outputUnit()); } - } // namespace std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( @@ -93,7 +123,7 @@ std::unique_ptr<WindowFunctionExec> WindowFunctionExec::create( return stdx::visit( visit_helper::Overloaded{ [&](const WindowBounds::DocumentBased& docBounds) { - return translateDocumentWindow(iter, functionStmt.expr, docBounds); + return translateDocumentWindow(iter, functionStmt.expr, sortBy, docBounds); }, [&](const WindowBounds::RangeBased& rangeBounds) -> std::unique_ptr<WindowFunctionExec> { diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h index bb7418d66c8..5fda7ceedfd 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable.h @@ -78,10 +78,10 @@ public: return stdx::get<int>(_upperDocumentBound); }(); - if (auto doc = (this->_iter)[upperIndex]) + if (auto doc = (this->_iter)[upperIndex]) { _function->process( _input->evaluate(*doc, &_input->getExpressionContext()->variables), false); - else { + } else { // Upper bound is out of range, but may be because it's off of the end of the // partition. For instance, for bounds [unbounded, -1] we won't be able to // access the upper bound until the second call to getNext(). diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp index d8042421f95..2263d96ff1c 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_non_removable_test.cpp @@ -31,6 +31,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/accumulator_for_window_functions.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -53,14 +54,27 @@ public: WindowFunctionExecNonRemovable<AccumulatorState> createForFieldPath( std::deque<DocumentSource::GetNextResult> docs, const std::string& inputPath, - WindowBounds::Bound<int> upper) { + WindowBounds::Bound<int> upper, + boost::optional<std::string> sortByPath = boost::none) { _docSource = DocumentSourceMock::createForTest(std::move(docs), getExpCtx()); _iter = std::make_unique<PartitionIterator>( getExpCtx().get(), _docSource.get(), boost::none, boost::none); auto input = ExpressionFieldPath::parse( getExpCtx().get(), inputPath, getExpCtx()->variablesParseState); - return WindowFunctionExecNonRemovable<AccumulatorState>( - _iter.get(), std::move(input), AccumulatorType::create(getExpCtx().get()), upper); + if (sortByPath) { + auto sortBy = ExpressionFieldPath::parse( + getExpCtx().get(), *sortByPath, getExpCtx()->variablesParseState); + return WindowFunctionExecNonRemovable<AccumulatorState>( + _iter.get(), + ExpressionArray::create( + getExpCtx().get(), + std::vector<boost::intrusive_ptr<Expression>>{sortBy, input}), + AccumulatorType::create(getExpCtx().get()), + upper); + } else { + return WindowFunctionExecNonRemovable<AccumulatorState>( + _iter.get(), std::move(input), AccumulatorType::create(getExpCtx().get()), upper); + } } auto advanceIterator() { @@ -173,5 +187,20 @@ TEST_F(WindowFunctionExecNonRemovableTest, InputExpressionAllowedToCreateVariabl ASSERT_VALUE_EQ(Value(std::vector<Value>{Value(2), Value(3)}), exec.getNext()); } +TEST_F(WindowFunctionExecNonRemovableTest, CanReceiveSortByExpression) { + const auto docs = std::deque<DocumentSource::GetNextResult>{ + Document{{"x", 1}, {"y", 0}}, Document{{"x", 3}, {"y", 2}}, Document{{"x", 5}, {"y", 4}}}; + auto mgr = createForFieldPath<AccumulatorIntegral>( + docs, "$y" /* input */, 0, std::string("$x") /* sortBy */); + double expectedIntegral = 0; + ASSERT_VALUE_EQ(Value(expectedIntegral), mgr.getNext()); + advanceIterator(); + expectedIntegral += 2.0; // (2 + 0) * (3 - 1) / 2.0 = 2.0 + ASSERT_VALUE_EQ(Value(expectedIntegral), mgr.getNext()); + advanceIterator(); + expectedIntegral += 6.0; // (4 + 2) * (5 - 3) / 2.0 = 6.0 + ASSERT_VALUE_EQ(Value(expectedIntegral), mgr.getNext()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp index 38a61a000bd..74cd8fd584e 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.cpp @@ -73,12 +73,7 @@ void WindowFunctionExecRemovableDocument::initialize() { for (int i = lowerBoundForInit; !_upperBound || i <= _upperBound.get(); ++i) { // If this is false, we're over the end of the partition. if (auto doc = (this->_iter)[i]) { - Value valToAdd = _sortBy - ? Value(std::vector<Value>{ - _sortBy->evaluate(*doc, &_sortBy->getExpressionContext()->variables), - _input->evaluate(*doc, &_input->getExpressionContext()->variables)}) - : _input->evaluate(*doc, &_input->getExpressionContext()->variables); - addValue(valToAdd); + addValue(_input->evaluate(*doc, &_input->getExpressionContext()->variables)); } else { break; } @@ -96,12 +91,7 @@ void WindowFunctionExecRemovableDocument::update() { if (_upperBound) { // If this is false, we're over the end of the partition. if (auto doc = (this->_iter)[_upperBound.get()]) { - Value valToAdd = _sortBy - ? Value(std::vector<Value>{ - _sortBy->evaluate(*doc, &_sortBy->getExpressionContext()->variables), - _input->evaluate(*doc, &_input->getExpressionContext()->variables)}) - : _input->evaluate(*doc, &_input->getExpressionContext()->variables); - addValue(valToAdd); + addValue(_input->evaluate(*doc, &_input->getExpressionContext()->variables)); } } diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h index b8564eda950..2519f8f22a8 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_document.h @@ -54,24 +54,6 @@ public: std::unique_ptr<WindowFunctionState> function, WindowBounds::DocumentBased bounds); - /** - * Constructs a removable window function executor with the given input expression and sortBy - * expression to be evaluated and passed the evaluation of both "input" and "sortBy" as a single - * input Value of a 2-sized vector (Value{sortByValue, inputValue}) to the corresponding - * WindowFunc for each document in the window. - * - * The "bounds" parameter is the user supplied bounds for the window. - */ - WindowFunctionExecRemovableDocument(PartitionIterator* iter, - boost::intrusive_ptr<Expression> input, - boost::intrusive_ptr<Expression> sortBy, - std::unique_ptr<WindowFunctionState> function, - WindowBounds::DocumentBased bounds) - : WindowFunctionExecRemovableDocument(iter, std::move(input), std::move(function), bounds) { - _sortBy = std::move(sortBy); - _memUsageBytes = sizeof(*this); - } - void reset() final { _function->reset(); _values = std::queue<Value>(); @@ -96,8 +78,6 @@ private: // accumulating/removing values. bool _initialized = false; - boost::intrusive_ptr<Expression> _sortBy = nullptr; - int _lowerBound; // Will stay boost::none if right unbounded. boost::optional<int> _upperBound = boost::none; diff --git a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp index 4e91a9b21f3..3de88f920f1 100644 --- a/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_exec_removable_test.cpp @@ -80,7 +80,11 @@ public: std::unique_ptr<WindowFunctionState> integralFunc = std::make_unique<WindowFunctionIntegral>(getExpCtx().get()); return WindowFunctionExecRemovableDocument( - _iter.get(), std::move(input), std::move(sortBy), std::move(integralFunc), bounds); + _iter.get(), + ExpressionArray::create(getExpCtx().get(), + std::vector<boost::intrusive_ptr<Expression>>{sortBy, input}), + std::move(integralFunc), + bounds); } auto advanceIterator() { diff --git a/src/mongo/db/pipeline/window_function/window_function_expression.h b/src/mongo/db/pipeline/window_function/window_function_expression.h index bb1ef7d7704..a34bc68c0d5 100644 --- a/src/mongo/db/pipeline/window_function/window_function_expression.h +++ b/src/mongo/db/pipeline/window_function/window_function_expression.h @@ -31,6 +31,7 @@ #include "mongo/base/initializer.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/accumulator_for_window_functions.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_set_window_fields_gen.h" #include "mongo/db/pipeline/window_function/window_bounds.h" @@ -397,17 +398,101 @@ protected: boost::optional<Decimal128> _alpha; }; -class ExpressionDerivative : public Expression { +class ExpressionWithOutputUnit : public Expression { public: static constexpr StringData kArgInput = "input"_sd; static constexpr StringData kArgOutputUnit = "outputUnit"_sd; + ExpressionWithOutputUnit(ExpressionContext* expCtx, + std::string accumulatorName, + boost::intrusive_ptr<::mongo::Expression> input, + WindowBounds bounds, + boost::optional<TimeUnit> outputUnit) + : Expression(expCtx, accumulatorName, std::move(input), std::move(bounds)), + _outputUnit(outputUnit) {} + + boost::optional<TimeUnit> outputUnit() const { + return _outputUnit; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { + MutableDocument result; + result[_accumulatorName][kArgInput] = _input->serialize(static_cast<bool>(explain)); + if (_outputUnit) { + result[_accumulatorName][kArgOutputUnit] = Value(serializeTimeUnit(*_outputUnit)); + } + + MutableDocument windowField; + _bounds.serialize(windowField); + result[kWindowArg] = windowField.freezeToValue(); + return result.freezeToValue(); + } + +protected: + static boost::optional<TimeUnit> parseOutputUnit(const BSONElement& arg) { + boost::optional<TimeUnit> outputUnit; + { + uassert(ErrorCodes::FailedToParse, + str::stream() << kArgOutputUnit << "' must be a string, but got " << arg.type(), + arg.type() == String); + outputUnit = parseTimeUnit(arg.valueStringData()); + switch (*outputUnit) { + // These larger time units vary so much, it doesn't make sense to define a + // fixed conversion from milliseconds. (See 'timeUnitTypicalMilliseconds'.) + case TimeUnit::year: + case TimeUnit::quarter: + case TimeUnit::month: + uasserted(5490704, "outputUnit must be 'week' or smaller"); + // Only these time units are allowed. + case TimeUnit::week: + case TimeUnit::day: + case TimeUnit::hour: + case TimeUnit::minute: + case TimeUnit::second: + case TimeUnit::millisecond: + break; + } + } + return outputUnit; + } + + static void validateSortBy(const boost::optional<SortPattern>& sortBy, + const std::string& accumulatorName) { + uassert(ErrorCodes::FailedToParse, + str::stream() << accumulatorName << " requires a sortBy", + sortBy); + uassert(ErrorCodes::FailedToParse, + str::stream() << accumulatorName << " requires a non-compound sortBy", + sortBy->size() == 1); + uassert(ErrorCodes::FailedToParse, + str::stream() << accumulatorName << " requires a non-expression sortBy", + !sortBy->begin()->expression); + uassert(ErrorCodes::FailedToParse, + str::stream() << accumulatorName << " requires an ascending sortBy", + sortBy->begin()->isAscending); + } + + boost::optional<long long> convertTimeUnitToMillis(boost::optional<TimeUnit> outputUnit) const { + if (!outputUnit) + return boost::none; + + auto status = timeUnitTypicalMilliseconds(*outputUnit); + tassert(status); + + return status.getValue(); + } + + boost::optional<TimeUnit> _outputUnit; +}; + +class ExpressionDerivative : public ExpressionWithOutputUnit { +public: ExpressionDerivative(ExpressionContext* expCtx, boost::intrusive_ptr<::mongo::Expression> input, WindowBounds bounds, boost::optional<TimeUnit> outputUnit) - : Expression(expCtx, "$derivative", std::move(input), std::move(bounds)), - _outputUnit(outputUnit) {} + : ExpressionWithOutputUnit( + expCtx, "$derivative", std::move(input), std::move(bounds), outputUnit) {} static boost::intrusive_ptr<Expression> parse(BSONObj obj, const boost::optional<SortPattern>& sortBy, @@ -419,17 +504,7 @@ public: // } // window: {...} // optional // } - - uassert(ErrorCodes::FailedToParse, "$derivative requires a sortBy", sortBy); - uassert(ErrorCodes::FailedToParse, - "$derivative requires a non-compound sortBy", - sortBy->size() == 1); - uassert(ErrorCodes::FailedToParse, - "$derivative requires a non-expression sortBy", - !sortBy->begin()->expression); - uassert(ErrorCodes::FailedToParse, - "$derivative requires an ascending sortBy", - sortBy->begin()->isAscending); + validateSortBy(sortBy, "$derivative"); boost::optional<WindowBounds> bounds; BSONElement derivativeArgs; @@ -462,27 +537,7 @@ public: if (argName == kArgInput) { input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState); } else if (argName == kArgOutputUnit) { - uassert(ErrorCodes::FailedToParse, - str::stream() << "$derivative '" << kArgOutputUnit - << "' must be a string, but got " << arg.type(), - arg.type() == String); - outputUnit = parseTimeUnit(arg.valueStringData()); - switch (*outputUnit) { - // These larger time units vary so much, it doesn't make sense to define a - // fixed conversion from milliseconds. (See 'timeUnitTypicalMilliseconds'.) - case TimeUnit::year: - case TimeUnit::quarter: - case TimeUnit::month: - uasserted(5490704, "$derivative outputUnit must be 'week' or smaller"); - // Only these time units are allowed. - case TimeUnit::week: - case TimeUnit::day: - case TimeUnit::hour: - case TimeUnit::minute: - case TimeUnit::second: - case TimeUnit::millisecond: - break; - } + outputUnit = parseOutputUnit(arg); } else { uasserted(ErrorCodes::FailedToParse, str::stream() << "$derivative got unexpected argument: " << argName); @@ -500,19 +555,6 @@ public: expCtx, std::move(input), std::move(*bounds), outputUnit); } - Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final { - MutableDocument result; - result[_accumulatorName][kArgInput] = _input->serialize(static_cast<bool>(explain)); - if (_outputUnit) { - result[_accumulatorName][kArgOutputUnit] = Value(serializeTimeUnit(*_outputUnit)); - } - - MutableDocument windowField; - _bounds.serialize(windowField); - result[kWindowArg] = windowField.freezeToValue(); - return result.freezeToValue(); - } - boost::intrusive_ptr<AccumulatorState> buildAccumulatorOnly() const final { MONGO_UNREACHABLE_TASSERT(5490701); } @@ -520,13 +562,85 @@ public: std::unique_ptr<WindowFunctionState> buildRemovable() const final { MONGO_UNREACHABLE_TASSERT(5490702); } +}; - auto outputUnit() const { - return _outputUnit; +class ExpressionIntegral : public ExpressionWithOutputUnit { +public: + ExpressionIntegral(ExpressionContext* expCtx, + boost::intrusive_ptr<::mongo::Expression> input, + WindowBounds bounds, + boost::optional<TimeUnit> outputUnit) + : ExpressionWithOutputUnit( + expCtx, "$integral", std::move(input), std::move(bounds), outputUnit) {} + + static boost::intrusive_ptr<Expression> parse(BSONObj obj, + const boost::optional<SortPattern>& sortBy, + ExpressionContext* expCtx) { + // { + // $integral: { + // input: <expr>, + // outputUnit: <string>, // optional + // } + // window: {...} // optional + // } + // + validateSortBy(sortBy, "$integral"); + + boost::optional<WindowBounds> bounds = boost::none; + BSONElement integralArgs; + for (const auto& arg : obj) { + auto argName = arg.fieldNameStringData(); + if (argName == kWindowArg) { + uassert(ErrorCodes::FailedToParse, + "'window' field must be an object", + obj[kWindowArg].type() == BSONType::Object); + uassert(ErrorCodes::FailedToParse, + "There can be only one 'window' field for $integral", + bounds == boost::none); + bounds = WindowBounds::parse(arg.embeddedObject(), sortBy, expCtx); + } else if (argName == "$integral"_sd) { + integralArgs = arg; + } else { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "$integral got unexpected argument: " << argName); + } + } + tassert( + 5558801, "$integral parser called on object with no $integral key", integralArgs.ok()); + uassert(ErrorCodes::FailedToParse, + str::stream() << "$integral expects an object, but got a " << integralArgs.type() + << ": " << integralArgs, + integralArgs.type() == BSONType::Object); + + boost::intrusive_ptr<::mongo::Expression> input; + boost::optional<TimeUnit> outputUnit = boost::none; + for (const auto& arg : integralArgs.Obj()) { + auto argName = arg.fieldNameStringData(); + if (argName == kArgInput) { + input = ::mongo::Expression::parseOperand(expCtx, arg, expCtx->variablesParseState); + } else if (argName == kArgOutputUnit) { + uassert(ErrorCodes::FailedToParse, + "There can be only one 'outputUnit' field for $integral", + outputUnit == boost::none); + outputUnit = parseOutputUnit(arg); + } else { + uasserted(ErrorCodes::FailedToParse, + str::stream() << "$integral got unexpected argument: " << argName); + } + } + uassert(ErrorCodes::FailedToParse, "$integral requires an 'input' expression", input); + + return make_intrusive<ExpressionIntegral>( + expCtx, std::move(input), bounds ? *bounds : WindowBounds(), outputUnit); } -private: - boost::optional<TimeUnit> _outputUnit; + boost::intrusive_ptr<AccumulatorState> buildAccumulatorOnly() const final { + return AccumulatorIntegral::create(_expCtx, convertTimeUnitToMillis(_outputUnit)); + } + + std::unique_ptr<WindowFunctionState> buildRemovable() const final { + return WindowFunctionIntegral::create(_expCtx, convertTimeUnitToMillis(_outputUnit)); + } }; } // namespace mongo::window_function diff --git a/src/mongo/db/pipeline/window_function/window_function_integral.cpp b/src/mongo/db/pipeline/window_function/window_function_integral.cpp index 24eea559f9c..484cb9341e6 100644 --- a/src/mongo/db/pipeline/window_function/window_function_integral.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_integral.cpp @@ -64,11 +64,11 @@ void WindowFunctionIntegral::assertValueType(const Value& value) { auto arr = value.getArray(); if (_outputUnitMillis) { - tassert(5423901, + uassert(5423901, "$integral with 'outputUnit' expects the sortBy field to be a Date", arr[0].getType() == BSONType::Date); } else { - tassert(5423902, + uassert(5423902, "$integral (with no 'outputUnit') expects the sortBy field to be numeric", arr[0].numeric()); } @@ -86,6 +86,13 @@ void WindowFunctionIntegral::add(Value value) { _integral.add(integralOfTwoPointsByTrapezoidalRule(_values.back(), value)); } + // "WindowFunctionIntegral" could be used as a non-removable accumulator which does not need to + // track the values in the window because no removal will be made. 'pop_front()' whenever a new + // value is added to the queue so as to save memory. + if (!_values.empty() && isNonremovable) { + _memUsageBytes -= _values.front().getApproximateSize(); + _values.pop_front(); + } _memUsageBytes += value.getApproximateSize(); _values.emplace_back(std::move(value)); } diff --git a/src/mongo/db/pipeline/window_function/window_function_integral.h b/src/mongo/db/pipeline/window_function/window_function_integral.h index 6516ba6ca16..10b970a4c5f 100644 --- a/src/mongo/db/pipeline/window_function/window_function_integral.h +++ b/src/mongo/db/pipeline/window_function/window_function_integral.h @@ -45,7 +45,8 @@ public: } explicit WindowFunctionIntegral(ExpressionContext* const expCtx, - boost::optional<long long> outputUnitMillis = boost::none) + boost::optional<long long> outputUnitMillis = boost::none, + bool isNonremovable = false) : WindowFunctionState(expCtx), _integral(expCtx), _outputUnitMillis(outputUnitMillis) { _memUsageBytes = sizeof(*this); } @@ -61,6 +62,9 @@ public: _values.clear(); _nanCount = 0; _integral.reset(); + // AccumulatorIntegral's reset() depends on the fact that WindowFunctionIntegral's reset() + // will set '_memUsageBytes' to sizeof(*this). If you want to reset '_memUsageBytes' to + // other value, please update AccumulatorIntegral's reset() as well. _memUsageBytes = sizeof(*this); } @@ -91,6 +95,7 @@ private: std::deque<Value> _values; boost::optional<long long> _outputUnitMillis; int _nanCount = 0; + bool isNonremovable = false; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp b/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp index f670a438e6a..4f57b1564f1 100644 --- a/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp +++ b/src/mongo/db/pipeline/window_function/window_function_integral_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/window_function/window_function_integral.h" -#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/time_support.h" @@ -228,9 +227,7 @@ TEST_F(WindowFunctionIntegralTest, CanHandleDateTypeWithOutputUnit) { ASSERT_VALUE_EQ(integral.getValue(), Value(expectedIntegral)); } -DEATH_TEST_F(WindowFunctionIntegralTest, - DatesWithoutOutputUnitShouldFail, - "expects the sortBy field to be numeric") { +TEST_F(WindowFunctionIntegralTest, DatesWithoutOutputUnitShouldFail) { const std::vector<Value> values = { Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1000)), Value(2)})), Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1002)), Value(4)})), @@ -240,9 +237,7 @@ DEATH_TEST_F(WindowFunctionIntegralTest, ASSERT_THROWS_CODE(addValuesToWindow(values), AssertionException, 5423902); } -DEATH_TEST_F(WindowFunctionIntegralTest, - NumbersWithOutputUnitShouldFail, - "expects the sortBy field to be a Date") { +TEST_F(WindowFunctionIntegralTest, NumbersWithOutputUnitShouldFail) { const std::vector<Value> values = { Value(std::vector<Value>({Value(3), Value(2)})), Value(std::vector<Value>({Value(5), Value(4)})), @@ -256,9 +251,7 @@ DEATH_TEST_F(WindowFunctionIntegralTest, ASSERT_THROWS_CODE(addValuesToWindow(values), AssertionException, 5423901); } -DEATH_TEST_F(WindowFunctionIntegralTest, - ResetShouldNotResetOutputUnit, - "expects the sortBy field to be a Date") { +TEST_F(WindowFunctionIntegralTest, ResetShouldNotResetOutputUnit) { const std::vector<Value> dateValues = { Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1000)), Value(0)})), Value(std::vector<Value>({Value(Date_t::fromMillisSinceEpoch(1002)), Value(2)})), |