/** * Copyright (C) 2023-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/pipeline/accumulator_percentile.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/idl/idl_parser.h" namespace mongo { using boost::intrusive_ptr; REGISTER_ACCUMULATOR_WITH_FEATURE_FLAG(percentile, AccumulatorPercentile::parseArgs, feature_flags::gFeatureFlagApproxPercentiles); REGISTER_EXPRESSION_WITH_FEATURE_FLAG(percentile, AccumulatorPercentile::parseExpression, AllowedWithApiStrict::kNeverInVersion1, AllowedWithClientType::kAny, feature_flags::gFeatureFlagApproxPercentiles); REGISTER_ACCUMULATOR_WITH_FEATURE_FLAG(median, AccumulatorMedian::parseArgs, feature_flags::gFeatureFlagApproxPercentiles); REGISTER_EXPRESSION_WITH_FEATURE_FLAG(median, AccumulatorMedian::parseExpression, AllowedWithApiStrict::kNeverInVersion1, AllowedWithClientType::kAny, feature_flags::gFeatureFlagApproxPercentiles); Status AccumulatorPercentile::validatePercentileArg(const std::vector& pv) { if (pv.empty()) { return {ErrorCodes::BadValue, "'p' cannot be an empty array"}; } for (const double& p : pv) { if (p < 0 || p > 1) { return {ErrorCodes::BadValue, str::stream() << "'p' must be an array of numeric values from [0.0, 1.0] " "range, but received incorrect value: " << p}; } } return Status::OK(); } AccumulationExpression AccumulatorPercentile::parseArgs(ExpressionContext* const expCtx, BSONElement elem, VariablesParseState vps) { expCtx->sbeGroupCompatibility = SbeCompatibility::notCompatible; uassert(7429703, str::stream() << "specification must be an object; found " << elem, elem.type() == BSONType::Object); auto spec = AccumulatorPercentileSpec::parse(IDLParserContext(kName), elem.Obj()); boost::intrusive_ptr input = Expression::parseOperand(expCtx, spec.getInput().getElement(), vps); std::vector ps = spec.getP(); PercentileMethodEnum method = spec.getMethod(); auto factory = [expCtx, ps, method] { return AccumulatorPercentile::create(expCtx, ps, static_cast(method)); }; return {ExpressionConstant::create(expCtx, Value(BSONNULL)) /*initializer*/, std::move(input) /*argument*/, std::move(factory), "$percentile"_sd /*name*/}; } std::pair /*ps*/, int32_t /*method*/> AccumulatorPercentile::parsePercentileAndMethod(BSONElement elem) { auto spec = AccumulatorPercentileSpec::parse(IDLParserContext(kName), elem.Obj()); return std::pair, int32_t>(spec.getP(), static_cast(spec.getMethod())); } boost::intrusive_ptr AccumulatorPercentile::parseExpression( ExpressionContext* const expCtx, BSONElement elem, VariablesParseState vps) { expCtx->sbeGroupCompatibility = SbeCompatibility::notCompatible; uassert(7436200, str::stream() << "specification must be an object; found " << elem, elem.type() == BSONType::Object); auto spec = AccumulatorPercentileSpec::parse(IDLParserContext(kName), elem.Obj()); boost::intrusive_ptr input = Expression::parseOperand(expCtx, spec.getInput().getElement(), vps); std::vector ps = spec.getP(); PercentileMethodEnum method = spec.getMethod(); return make_intrusive>( expCtx, ps, input, static_cast(method)); } void AccumulatorPercentile::processInternal(const Value& input, bool merging) { if (merging) { dynamic_cast*>(_algo.get())->combine(input); return; } if (!input.numeric()) { return; } _algo->incorporate(input.coerceToDouble()); _memUsageBytes = sizeof(*this) + _algo->memUsageBytes(); } Value AccumulatorPercentile::formatFinalValue(int nPercentiles, const std::vector& pctls) { if (pctls.empty()) { std::vector nulls; nulls.insert(nulls.end(), nPercentiles, Value(BSONNULL)); return Value(nulls); } return Value(std::vector(pctls.begin(), pctls.end())); } Value AccumulatorPercentile::getValue(bool toBeMerged) { if (toBeMerged) { return dynamic_cast*>(_algo.get())->serialize(); } return AccumulatorPercentile::formatFinalValue(_percentiles.size(), _algo->computePercentiles(_percentiles)); } namespace { std::unique_ptr createPercentileAlgorithm(int32_t method) { switch (static_cast(method)) { case PercentileMethodEnum::Approximate: return createTDigestDistributedClassic(); default: tasserted(7435800, str::stream() << "Currently only approximate percentiles are supported"); } return nullptr; } } // namespace AccumulatorPercentile::AccumulatorPercentile(ExpressionContext* const expCtx, const std::vector& ps, int32_t method) : AccumulatorState(expCtx), _percentiles(ps), _algo(createPercentileAlgorithm(method)), _method(method) { _memUsageBytes = sizeof(*this) + _algo->memUsageBytes(); } void AccumulatorPercentile::reset() { _algo = createPercentileAlgorithm(_method); _memUsageBytes = sizeof(*this) + _algo->memUsageBytes(); } Document AccumulatorPercentile::serialize(boost::intrusive_ptr initializer, boost::intrusive_ptr argument, SerializationOptions options) const { ExpressionConstant const* ec = dynamic_cast(initializer.get()); invariant(ec); invariant(ec->getValue().nullish()); MutableDocument md; AccumulatorPercentile::serializeHelper( argument, options, _percentiles, static_cast(_method), md); return DOC(getOpName() << md.freeze()); } void AccumulatorPercentile::serializeHelper(const boost::intrusive_ptr& argument, SerializationOptions options, std::vector percentiles, int32_t method, MutableDocument& md) { md.addField(AccumulatorPercentileSpec::kInputFieldName, Value(argument->serialize(options))); md.addField(AccumulatorPercentileSpec::kPFieldName, Value(std::vector(percentiles.begin(), percentiles.end()))); md.addField(AccumulatorPercentileSpec::kMethodFieldName, Value(PercentileMethod_serializer(static_cast(method)))); } intrusive_ptr AccumulatorPercentile::create(ExpressionContext* const expCtx, const std::vector& ps, int32_t method) { return new AccumulatorPercentile(expCtx, ps, method); } AccumulationExpression AccumulatorMedian::parseArgs(ExpressionContext* const expCtx, BSONElement elem, VariablesParseState vps) { expCtx->sbeGroupCompatibility = SbeCompatibility::notCompatible; uassert(7436100, str::stream() << "specification must be an object; found " << elem, elem.type() == BSONType::Object); auto spec = AccumulatorMedianSpec::parse(IDLParserContext(kName), elem.Obj()); boost::intrusive_ptr input = Expression::parseOperand(expCtx, spec.getInput().getElement(), vps); PercentileMethodEnum method = spec.getMethod(); auto factory = [expCtx, method] { return AccumulatorMedian::create(expCtx, {} /* unused */, static_cast(method)); }; return {ExpressionConstant::create(expCtx, Value(BSONNULL)) /*initializer*/, std::move(input) /*argument*/, std::move(factory), "$ median"_sd /*name*/}; } std::pair /*ps*/, int32_t /*method*/> AccumulatorMedian::parsePercentileAndMethod(BSONElement elem) { auto spec = AccumulatorMedianSpec::parse(IDLParserContext(kName), elem.Obj()); return std::pair, int32_t>({0.5}, static_cast(spec.getMethod())); } boost::intrusive_ptr AccumulatorMedian::parseExpression(ExpressionContext* const expCtx, BSONElement elem, VariablesParseState vps) { expCtx->sbeGroupCompatibility = SbeCompatibility::notCompatible; uassert(7436201, str::stream() << "specification must be an object; found " << elem, elem.type() == BSONType::Object); auto spec = AccumulatorMedianSpec::parse(IDLParserContext(kName), elem.Obj()); boost::intrusive_ptr input = Expression::parseOperand(expCtx, spec.getInput().getElement(), vps); std::vector p = {0.5}; PercentileMethodEnum method = spec.getMethod(); return make_intrusive>( expCtx, p, input, static_cast(method)); } AccumulatorMedian::AccumulatorMedian(ExpressionContext* expCtx, const std::vector& /* unused */, int32_t method) : AccumulatorPercentile(expCtx, {0.5} /* ps */, method){}; intrusive_ptr AccumulatorMedian::create(ExpressionContext* expCtx, const std::vector& /* unused */, int32_t method) { return new AccumulatorMedian(expCtx, {} /* unused */, method); } Value AccumulatorMedian::formatFinalValue(int nPercentiles, const std::vector& pctls) { if (pctls.empty()) { return Value(BSONNULL); } tassert(7436101, "the percentile method for median must return a single result.", pctls.size() == 1); return Value(pctls.front()); } Value AccumulatorMedian::getValue(bool toBeMerged) { // $median only adjusts the output of the final result, the internal logic for merging is up to // the implementation of $percentile. if (toBeMerged) { return AccumulatorPercentile::getValue(toBeMerged); } return AccumulatorMedian::formatFinalValue(_percentiles.size(), _algo->computePercentiles(_percentiles)); } Document AccumulatorMedian::serialize(boost::intrusive_ptr initializer, boost::intrusive_ptr argument, SerializationOptions options) const { ExpressionConstant const* ec = dynamic_cast(initializer.get()); invariant(ec); invariant(ec->getValue().nullish()); MutableDocument md; AccumulatorMedian::serializeHelper( argument, options, _percentiles, static_cast(_method), md); return DOC(getOpName() << md.freeze()); } void AccumulatorMedian::serializeHelper(const boost::intrusive_ptr& argument, SerializationOptions options, std::vector percentiles, int32_t method, MutableDocument& md) { md.addField(AccumulatorPercentileSpec::kInputFieldName, Value(argument->serialize(options))); md.addField(AccumulatorPercentileSpec::kMethodFieldName, Value(PercentileMethod_serializer(static_cast(method)))); } } // namespace mongo