diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2021-10-20 19:07:37 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-20 19:38:32 +0000 |
commit | 1c5b61079bf081df648f3ce91c42bd5d9fd9d8c9 (patch) | |
tree | 54c9be76d23413611f0d6e2e4d5f02395540dccf /src/mongo/db/pipeline/accumulator_multi.cpp | |
parent | 33bdeafc421ba379294db1d695d5f8a96e9c39be (diff) | |
download | mongo-1c5b61079bf081df648f3ce91c42bd5d9fd9d8c9.tar.gz |
SERVER-57880 Implement $top, $topN, $bottom, and $bottomN accumulators
Diffstat (limited to 'src/mongo/db/pipeline/accumulator_multi.cpp')
-rw-r--r-- | src/mongo/db/pipeline/accumulator_multi.cpp | 306 |
1 files changed, 304 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/accumulator_multi.cpp b/src/mongo/db/pipeline/accumulator_multi.cpp index 5c848f50434..a1213b83a10 100644 --- a/src/mongo/db/pipeline/accumulator_multi.cpp +++ b/src/mongo/db/pipeline/accumulator_multi.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/pipeline/accumulator_multi.h" +#include "mongo/db/query/sort_pattern.h" #include "mongo/util/version/releases.h" namespace mongo { @@ -92,6 +93,36 @@ REGISTER_EXPRESSION_CONDITIONALLY( AllowedWithClientType::kAny, boost::none, feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV()); +// TODO SERVER-57884 Add $firstN/$lastN as window functions. +// TODO SERVER-57886 Add $topN/$bottomN/$top/$bottom as window functions. +REGISTER_ACCUMULATOR_CONDITIONALLY( + topN, + (AccumulatorTopBottomN<TopBottomSense::kTop, false>::parseTopBottomN), + AllowedWithApiStrict::kNeverInVersion1, + AllowedWithClientType::kAny, + boost::none, + feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV()); +REGISTER_ACCUMULATOR_CONDITIONALLY( + bottomN, + (AccumulatorTopBottomN<TopBottomSense::kBottom, false>::parseTopBottomN), + AllowedWithApiStrict::kNeverInVersion1, + AllowedWithClientType::kAny, + boost::none, + feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV()); +REGISTER_ACCUMULATOR_CONDITIONALLY( + top, + (AccumulatorTopBottomN<TopBottomSense::kTop, true>::parseTopBottomN), + AllowedWithApiStrict::kNeverInVersion1, + AllowedWithClientType::kAny, + boost::none, + feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV()); +REGISTER_ACCUMULATOR_CONDITIONALLY( + bottom, + (AccumulatorTopBottomN<TopBottomSense::kBottom, true>::parseTopBottomN), + AllowedWithApiStrict::kNeverInVersion1, + AllowedWithClientType::kAny, + boost::none, + feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV()); AccumulatorN::AccumulatorN(ExpressionContext* const expCtx) : AccumulatorState(expCtx), _maxMemUsageBytes(internalQueryMaxNAccumulatorBytes.load()) {} @@ -180,8 +211,8 @@ AccumulatorN::parseArgs(ExpressionContext* const expCtx, uasserted(5787901, str::stream() << "Unknown argument for 'n' operator: " << fieldName); } } - uassert(5787906, str::stream() << "Missing value for " << kFieldNameN << "'", n); - uassert(5787907, str::stream() << "Missing value for " << kFieldNameOutput << "'", output); + uassert(5787906, str::stream() << "Missing value for '" << kFieldNameN << "'", n); + uassert(5787907, str::stream() << "Missing value for '" << kFieldNameOutput << "'", output); return std::make_tuple(n, output); } @@ -389,4 +420,275 @@ boost::intrusive_ptr<AccumulatorState> AccumulatorLastN::create(ExpressionContex return make_intrusive<AccumulatorLastN>(expCtx); } +// TODO SERVER-59327 Refactor other operators to use this parse function. +template <bool single> +std::tuple<boost::intrusive_ptr<Expression>, BSONElement, boost::optional<BSONObj>> +accumulatorNParseArgs(ExpressionContext* expCtx, + const BSONElement& elem, + const char* name, + bool needSortBy, + const VariablesParseState& vps) { + uassert(5788001, + str::stream() << "specification must be an object; found " << elem, + elem.type() == BSONType::Object); + BSONObj obj = elem.embeddedObject(); + + // Extract fields from specification object. sortBy and output are not immediately parsed into + // Expressions so that they can easily still be manipulated and processed in the special case of + // AccumulatorTopBottomN. + boost::optional<BSONObj> sortBy; + boost::optional<BSONElement> output; + boost::intrusive_ptr<Expression> n; + for (auto&& element : obj) { + auto fieldName = element.fieldNameStringData(); + if constexpr (!single) { + if (fieldName == AccumulatorN::kFieldNameN) { + n = Expression::parseOperand(expCtx, element, vps); + continue; + } + } + if (fieldName == AccumulatorN::kFieldNameOutput) { + output = element; + } else if (fieldName == AccumulatorN::kFieldNameSortBy && needSortBy) { + sortBy = element.Obj(); + } else { + uasserted(5788002, str::stream() << "Unknown argument to " << name << " " << fieldName); + } + } + + // Make sure needed arguments were found. + if constexpr (single) { + n = ExpressionConstant::create(expCtx, Value(1)); + } else { + uassert( + 5788003, str::stream() << "Missing value for '" << AccumulatorN::kFieldNameN << "'", n); + } + uassert(5788004, + str::stream() << "Missing value for '" << AccumulatorN::kFieldNameOutput << "'", + output); + if (needSortBy) { + uassert(5788005, + str::stream() << "Missing value for '" << AccumulatorN::kFieldNameSortBy << "'", + sortBy); + } + + return {n, *output, sortBy}; +} + +template <TopBottomSense sense, bool single> +AccumulatorTopBottomN<sense, single>::AccumulatorTopBottomN(ExpressionContext* const expCtx, + SortPattern sp) + : AccumulatorN(expCtx), _sortPattern(sp) { + + // Modify sortPattern to sort based on fields where they are in the evaluated argument instead + // of where they would be in the raw document received by $group and friends. + std::vector<SortPattern::SortPatternPart> parts; + int sortOrder = 0; + for (auto part : _sortPattern) { + const auto newFieldName = + (StringBuilder() << AccumulatorN::kFieldNameSortFields << "." << sortOrder).str(); + part.fieldPath.reset(FieldPath(newFieldName)); + + // TODO SERVER-60781 will change AccumulatorTopBottomN so it has different behavior + // Invert sort spec if $topN/top. + if constexpr (sense == TopBottomSense::kTop) { + // $topN usually flips sort pattern by making ascending false. for the case of textScore + // based sorting, there is no way to sort by least relevent in a normal mongodb sort + // specification so topN still returns the same order as bottomN (most relevent first). + if (!part.expression) { + part.isAscending = !part.isAscending; + } + } + if (part.expression) { + // $meta based sorting is handled earlier in the sortFields expression. See comment in + // parseAccumulatorTopBottomNSortBy(). + part.expression = nullptr; + } + parts.push_back(part); + sortOrder++; + } + SortPattern internalSortPattern(parts); + + _sortKeyComparator.emplace(internalSortPattern); + _sortKeyGenerator.emplace(std::move(internalSortPattern), expCtx->getCollator()); + + _memUsageBytes = sizeof(*this); + + // STL expects a less-than function not a 3-way compare function so this lambda wraps + // SortKeyComparator. + _map.emplace([&, this](const Value& lhs, const Value& rhs) { + return (*this->_sortKeyComparator)(lhs, rhs) < 0; + }); +} + +template <TopBottomSense sense, bool single> +const char* AccumulatorTopBottomN<sense, single>::getOpName() const { + return AccumulatorTopBottomN<sense, single>::getName().rawData(); +} + +template <TopBottomSense sense, bool single> +Document AccumulatorTopBottomN<sense, single>::serialize( + boost::intrusive_ptr<Expression> initializer, + boost::intrusive_ptr<Expression> argument, + bool explain) const { + MutableDocument args; + if constexpr (!single) { + args.addField(kFieldNameN, Value(initializer->serialize(explain))); + } + auto output = argument->serialize(explain)[kFieldNameOutput]; + tassert(5788000, + str::stream() << "expected argument expression to have " << kFieldNameOutput + << " field", + !output.missing()); + args.addField(kFieldNameOutput, Value(output)); + args.addField(kFieldNameSortBy, + Value(_sortPattern.serialize( + SortPattern::SortKeySerialization::kForPipelineSerialization))); + return DOC(getOpName() << args.freeze()); +} + +template <TopBottomSense sense> +std::pair<SortPattern, BSONArray> parseAccumulatorTopBottomNSortBy(ExpressionContext* const expCtx, + BSONObj sortBy) { + SortPattern sortPattern(sortBy, expCtx); + BSONArrayBuilder sortFieldsExpBab; + BSONObjIterator sortByBoi(sortBy); + int sortOrder = 0; + for (const auto& part : sortPattern) { + const auto fieldName = sortByBoi.next().fieldNameStringData(); + const auto newFieldName = + (StringBuilder() << AccumulatorN::kFieldNameSortFields << "." << sortOrder).str(); + + if (part.expression) { + // In a scenario where we are sorting by metadata (for example if sortBy is + // {text: {$meta: "textScore"}}) we cant use ["$text"] as the sortFields expression + // since the evaluated argument wouldn't have the same metadata as the original + // document. Instead we use [{$meta: "textScore"}] as the sortFields expression so the + // sortFields array contains the data we need for sorting. + const auto serialized = part.expression->serialize(false); + sortFieldsExpBab.append(serialized.getDocument().toBson()); + } else { + sortFieldsExpBab.append((StringBuilder() << "$" << fieldName).str()); + } + sortOrder++; + } + return {sortPattern, sortFieldsExpBab.arr()}; +} + +template <TopBottomSense sense, bool single> +AccumulationExpression AccumulatorTopBottomN<sense, single>::parseTopBottomN( + ExpressionContext* const expCtx, BSONElement elem, VariablesParseState vps) { + auto name = AccumulatorTopBottomN<sense, single>::getName(); + + const auto [n, output, sortBy] = + accumulatorNParseArgs<single>(expCtx, elem, name.rawData(), true, vps); + + auto [sortPattern, sortFieldsExp] = parseAccumulatorTopBottomNSortBy<sense>(expCtx, *sortBy); + + // Construct argument expression. If given sortBy: {field1: 1, field2: 1} it will be shaped like + // {output: <output expression>, sortFields: ["$field1", "$field2"]}. This projects out only the + // fields we need for sorting so we can use SortKeyComparator without copying the entire + // document. This argument expression will be evaluated and become the input to processValue. + boost::intrusive_ptr<Expression> argument = Expression::parseObject( + expCtx, BSON(output << AccumulatorN::kFieldNameSortFields << sortFieldsExp), vps); + + auto factory = [expCtx, sortPattern = std::move(sortPattern)] { + return make_intrusive<AccumulatorTopBottomN<sense, single>>(expCtx, sortPattern); + }; + + return {std::move(n), std::move(argument), std::move(factory), name}; +} + +template <TopBottomSense sense, bool single> +boost::intrusive_ptr<AccumulatorState> AccumulatorTopBottomN<sense, single>::create( + ExpressionContext* expCtx, BSONObj sortBy) { + return make_intrusive<AccumulatorTopBottomN<sense, single>>( + expCtx, parseAccumulatorTopBottomNSortBy<sense>(expCtx, sortBy).first); +} + +template <TopBottomSense sense, bool single> +void AccumulatorTopBottomN<sense, single>::processValue(const Value& val) { + tassert(5788014, + str::stream() << "processValue of " << getName() << "should have recieved an object", + val.isObject()); + + Value output = val[AccumulatorN::kFieldNameOutput]; + Value sortKey; + + // In the case that processValue() is getting called in the context of merging, a previous + // processValue has already generated the sortKey for us, so we don't need to regenerate it. + Value generatedSortKey = val[kFieldNameGeneratedSortKey]; + if (!generatedSortKey.missing()) { + sortKey = generatedSortKey; + } else { + sortKey = _sortKeyGenerator->computeSortKeyFromDocument(val.getDocument()); + } + KeyOutPair keyOutPair(sortKey, output); + + // Only compare if we have 'n' elements. + if (static_cast<long long>(_map->size()) == *_n) { + // Get an iterator to the element we want to compare against. + auto cmpElem = std::prev(_map->end()); + + // TODO SERVER-60781 will change AccumulatorTopBottomN so it has different behavior. $topN + // will insert items greater than the min and $bottomN will insert items less than the max. + auto cmp = (*_sortKeyComparator)(cmpElem->first, keyOutPair.first); + // When the sort key produces a tie we keep the first value seen. + if (cmp > 0) { + _memUsageBytes -= cmpElem->first.getApproximateSize() + + cmpElem->second.getApproximateSize() + sizeof(KeyOutPair); + _map->erase(cmpElem); + } else { + return; + } + } + _memUsageBytes += + sortKey.getApproximateSize() + output.getApproximateSize() + sizeof(KeyOutPair); + uassert(ErrorCodes::ExceededMemoryLimit, + str::stream() << getOpName() + << " used too much memory and cannot spill to disk. Memory limit: " + << _maxMemUsageBytes << " bytes", + _memUsageBytes < _maxMemUsageBytes); + _map->emplace(keyOutPair); +} + +template <TopBottomSense sense, bool single> +Value AccumulatorTopBottomN<sense, single>::getValue(bool toBeMerged) { + std::vector<Value> result; + for (const auto& keyOutPair : *_map) { + if (toBeMerged) { + result.emplace_back(BSON(kFieldNameGeneratedSortKey + << keyOutPair.first << kFieldNameOutput << keyOutPair.second)); + } else { + result.push_back(keyOutPair.second); + } + }; + + if constexpr (!single) { + return Value(result); + } else { + tassert(5788015, + str::stream() << getName() << " group did not contain exactly one value", + result.size() == 1); + if (toBeMerged) { + return Value(result); + } else { + return Value(result[0]); + } + } +} + +template <TopBottomSense sense, bool single> +void AccumulatorTopBottomN<sense, single>::reset() { + _map->clear(); + _memUsageBytes = sizeof(*this); +} + +// Explicitly specify the following classes should generated and should live in this compilation +// unit. +template class AccumulatorTopBottomN<TopBottomSense::kBottom, false>; +template class AccumulatorTopBottomN<TopBottomSense::kBottom, true>; +template class AccumulatorTopBottomN<TopBottomSense::kTop, false>; +template class AccumulatorTopBottomN<TopBottomSense::kTop, true>; + } // namespace mongo |