From 97e7a659d01f8f5ceef69a4f738cdf76396d99db Mon Sep 17 00:00:00 2001 From: Mihai Andrei Date: Wed, 21 Jul 2021 22:49:44 +0000 Subject: SERVER-57879 Implement $minN and $maxN accumulators --- src/mongo/db/pipeline/accumulator_multi.cpp | 211 ++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 src/mongo/db/pipeline/accumulator_multi.cpp (limited to 'src/mongo/db/pipeline/accumulator_multi.cpp') diff --git a/src/mongo/db/pipeline/accumulator_multi.cpp b/src/mongo/db/pipeline/accumulator_multi.cpp new file mode 100644 index 00000000000..b3cbf26c924 --- /dev/null +++ b/src/mongo/db/pipeline/accumulator_multi.cpp @@ -0,0 +1,211 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/accumulator_multi.h" + +namespace mongo { +// TODO SERVER-58379 Update these macros once FCV constants are upgraded. +REGISTER_ACCUMULATOR(maxN, AccumulatorMinMaxN::parseMinMaxN); +REGISTER_ACCUMULATOR(minN, AccumulatorMinMaxN::parseMinMaxN); +// TODO SERVER-57882 Add $minN/$maxN as expressions. +// TODO SERVER-57885 Add $minN/$maxN as window functions. + +AccumulatorN::AccumulatorN(ExpressionContext* const expCtx) + : AccumulatorState(expCtx), _maxMemUsageBytes(internalQueryMaxNAccumulatorBytes.load()) {} + +void AccumulatorN::startNewGroup(const Value& input) { + // Obtain the value for 'n' and error if it's not a positive integral. + uassert(5787902, + str::stream() << "Value for 'n' must be of integral type, but found " + << input.toString(), + isNumericBSONType(input.getType())); + auto n = input.coerceToLong(); + uassert(5787903, + str::stream() << "Value for 'n' must be of integral type, but found " + << input.toString(), + n == input.coerceToDouble()); + uassert(5787908, str::stream() << "'n' must be greater than 0, found " << n, n > 0); + _n = n; +} + +AccumulatorMinMaxN::AccumulatorMinMaxN(ExpressionContext* const expCtx, Sense sense) + : AccumulatorN(expCtx), + _set(expCtx->getValueComparator().makeOrderedValueMultiset()), + _sense(sense) { + _memUsageBytes = sizeof(*this); +} + +const char* AccumulatorMinMaxN::getOpName() const { + if (_sense == Sense::kMin) { + return AccumulatorMinN::getName(); + } else { + return AccumulatorMaxN::getName(); + } +} + +Document AccumulatorMinMaxN::serialize(boost::intrusive_ptr initializer, + boost::intrusive_ptr argument, + bool explain) const { + MutableDocument args; + AccumulatorN::serializeHelper(initializer, argument, explain, args); + return DOC(getOpName() << args.freeze()); +} + +std::tuple, boost::intrusive_ptr> +AccumulatorN::parseArgs(ExpressionContext* const expCtx, + const BSONObj& args, + VariablesParseState vps) { + boost::intrusive_ptr n; + boost::intrusive_ptr output; + for (auto&& element : args) { + auto fieldName = element.fieldNameStringData(); + if (fieldName == kFieldNameOutput) { + output = Expression::parseOperand(expCtx, element, vps); + } else if (fieldName == kFieldNameN) { + n = Expression::parseOperand(expCtx, element, vps); + } else { + uasserted(5787901, str::stream() << "Unknown argument to minN/maxN: " << fieldName); + } + } + uassert(5787906, "Missing value for 'n'", n); + uassert(5787907, "Missing value for 'output'", output); + return std::make_tuple(n, output); +} + +void AccumulatorN::serializeHelper(const boost::intrusive_ptr& initializer, + const boost::intrusive_ptr& argument, + bool explain, + MutableDocument& md) { + md.addField(kFieldNameN, Value(initializer->serialize(explain))); + md.addField(kFieldNameOutput, Value(argument->serialize(explain))); +} + +template +AccumulationExpression AccumulatorMinMaxN::parseMinMaxN(ExpressionContext* const expCtx, + BSONElement elem, + VariablesParseState vps) { + auto name = [] { + if constexpr (s == Sense::kMin) { + return AccumulatorMinN::getName(); + } else { + return AccumulatorMaxN::getName(); + } + }(); + + // TODO SERVER-58379 Remove this uassert once the FCV constants are upgraded and the REGISTER + // macros above are updated accordingly. + uassert(5787909, + str::stream() << "Cannot create " << name << " accumulator if feature flag is disabled", + feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV()); + uassert(5787900, + str::stream() << "specification must be an object; found " << elem, + elem.type() == BSONType::Object); + BSONObj obj = elem.embeddedObject(); + + auto [n, output] = AccumulatorN::parseArgs(expCtx, obj, vps); + + auto factory = [expCtx] { + if constexpr (s == Sense::kMin) { + return AccumulatorMinN::create(expCtx); + } else { + return AccumulatorMaxN::create(expCtx); + } + }; + + return {std::move(n), std::move(output), std::move(factory), name}; +} + +void AccumulatorMinMaxN::processValue(const Value& val) { + // Ignore nullish values. + if (val.nullish()) + return; + + // Only compare if we have 'n' elements. + if (static_cast(_set.size()) == *_n) { + // Get an iterator to the element we want to compare against. + auto cmpElem = _sense == Sense::kMin ? std::prev(_set.end()) : _set.begin(); + + auto cmp = getExpressionContext()->getValueComparator().compare(*cmpElem, val) * _sense; + if (cmp > 0) { + _memUsageBytes -= cmpElem->getApproximateSize(); + _set.erase(cmpElem); + } else { + return; + } + } + _memUsageBytes += val.getApproximateSize(); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() << getOpName() + << " used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes << " bytes", + _memUsageBytes < _maxMemUsageBytes); + _set.emplace(val); +} + +void AccumulatorMinMaxN::processInternal(const Value& input, bool merging) { + tassert(5787904, "'n' must be initialized", _n); + + if (merging) { + tassert(5787905, "input must be an array when 'merging' is true", input.isArray()); + auto array = input.getArray(); + for (auto&& val : array) { + processValue(val); + } + } else { + processValue(input); + } +} + +Value AccumulatorMinMaxN::getValue(bool toBeMerged) { + // Return the values in ascending order for 'kMin' and descending order for 'kMax'. + return Value(_sense == Sense::kMin ? std::vector(_set.begin(), _set.end()) + : std::vector(_set.rbegin(), _set.rend())); +} + +void AccumulatorMinMaxN::reset() { + _set = getExpressionContext()->getValueComparator().makeOrderedValueMultiset(); + _memUsageBytes = sizeof(*this); +} + +const char* AccumulatorMinN::getName() { + return kName.rawData(); +} + +boost::intrusive_ptr AccumulatorMinN::create(ExpressionContext* const expCtx) { + return make_intrusive(expCtx); +} + +const char* AccumulatorMaxN::getName() { + return kName.rawData(); +} + +boost::intrusive_ptr AccumulatorMaxN::create(ExpressionContext* const expCtx) { + return make_intrusive(expCtx); +} +} // namespace mongo -- cgit v1.2.1