summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/accumulator_multi.cpp
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2021-10-20 19:07:37 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-20 19:38:32 +0000
commit1c5b61079bf081df648f3ce91c42bd5d9fd9d8c9 (patch)
tree54c9be76d23413611f0d6e2e4d5f02395540dccf /src/mongo/db/pipeline/accumulator_multi.cpp
parent33bdeafc421ba379294db1d695d5f8a96e9c39be (diff)
downloadmongo-1c5b61079bf081df648f3ce91c42bd5d9fd9d8c9.tar.gz
SERVER-57880 Implement $top, $topN, $bottom, and $bottomN accumulators
Diffstat (limited to 'src/mongo/db/pipeline/accumulator_multi.cpp')
-rw-r--r--src/mongo/db/pipeline/accumulator_multi.cpp306
1 files changed, 304 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/accumulator_multi.cpp b/src/mongo/db/pipeline/accumulator_multi.cpp
index 5c848f50434..a1213b83a10 100644
--- a/src/mongo/db/pipeline/accumulator_multi.cpp
+++ b/src/mongo/db/pipeline/accumulator_multi.cpp
@@ -28,6 +28,7 @@
*/
#include "mongo/db/pipeline/accumulator_multi.h"
+#include "mongo/db/query/sort_pattern.h"
#include "mongo/util/version/releases.h"
namespace mongo {
@@ -92,6 +93,36 @@ REGISTER_EXPRESSION_CONDITIONALLY(
AllowedWithClientType::kAny,
boost::none,
feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV());
+// TODO SERVER-57884 Add $firstN/$lastN as window functions.
+// TODO SERVER-57886 Add $topN/$bottomN/$top/$bottom as window functions.
+REGISTER_ACCUMULATOR_CONDITIONALLY(
+ topN,
+ (AccumulatorTopBottomN<TopBottomSense::kTop, false>::parseTopBottomN),
+ AllowedWithApiStrict::kNeverInVersion1,
+ AllowedWithClientType::kAny,
+ boost::none,
+ feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV());
+REGISTER_ACCUMULATOR_CONDITIONALLY(
+ bottomN,
+ (AccumulatorTopBottomN<TopBottomSense::kBottom, false>::parseTopBottomN),
+ AllowedWithApiStrict::kNeverInVersion1,
+ AllowedWithClientType::kAny,
+ boost::none,
+ feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV());
+REGISTER_ACCUMULATOR_CONDITIONALLY(
+ top,
+ (AccumulatorTopBottomN<TopBottomSense::kTop, true>::parseTopBottomN),
+ AllowedWithApiStrict::kNeverInVersion1,
+ AllowedWithClientType::kAny,
+ boost::none,
+ feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV());
+REGISTER_ACCUMULATOR_CONDITIONALLY(
+ bottom,
+ (AccumulatorTopBottomN<TopBottomSense::kBottom, true>::parseTopBottomN),
+ AllowedWithApiStrict::kNeverInVersion1,
+ AllowedWithClientType::kAny,
+ boost::none,
+ feature_flags::gFeatureFlagExactTopNAccumulator.isEnabledAndIgnoreFCV());
AccumulatorN::AccumulatorN(ExpressionContext* const expCtx)
: AccumulatorState(expCtx), _maxMemUsageBytes(internalQueryMaxNAccumulatorBytes.load()) {}
@@ -180,8 +211,8 @@ AccumulatorN::parseArgs(ExpressionContext* const expCtx,
uasserted(5787901, str::stream() << "Unknown argument for 'n' operator: " << fieldName);
}
}
- uassert(5787906, str::stream() << "Missing value for " << kFieldNameN << "'", n);
- uassert(5787907, str::stream() << "Missing value for " << kFieldNameOutput << "'", output);
+ uassert(5787906, str::stream() << "Missing value for '" << kFieldNameN << "'", n);
+ uassert(5787907, str::stream() << "Missing value for '" << kFieldNameOutput << "'", output);
return std::make_tuple(n, output);
}
@@ -389,4 +420,275 @@ boost::intrusive_ptr<AccumulatorState> AccumulatorLastN::create(ExpressionContex
return make_intrusive<AccumulatorLastN>(expCtx);
}
+// TODO SERVER-59327 Refactor other operators to use this parse function.
+template <bool single>
+std::tuple<boost::intrusive_ptr<Expression>, BSONElement, boost::optional<BSONObj>>
+accumulatorNParseArgs(ExpressionContext* expCtx,
+ const BSONElement& elem,
+ const char* name,
+ bool needSortBy,
+ const VariablesParseState& vps) {
+ uassert(5788001,
+ str::stream() << "specification must be an object; found " << elem,
+ elem.type() == BSONType::Object);
+ BSONObj obj = elem.embeddedObject();
+
+ // Extract fields from specification object. sortBy and output are not immediately parsed into
+ // Expressions so that they can easily still be manipulated and processed in the special case of
+ // AccumulatorTopBottomN.
+ boost::optional<BSONObj> sortBy;
+ boost::optional<BSONElement> output;
+ boost::intrusive_ptr<Expression> n;
+ for (auto&& element : obj) {
+ auto fieldName = element.fieldNameStringData();
+ if constexpr (!single) {
+ if (fieldName == AccumulatorN::kFieldNameN) {
+ n = Expression::parseOperand(expCtx, element, vps);
+ continue;
+ }
+ }
+ if (fieldName == AccumulatorN::kFieldNameOutput) {
+ output = element;
+ } else if (fieldName == AccumulatorN::kFieldNameSortBy && needSortBy) {
+ sortBy = element.Obj();
+ } else {
+ uasserted(5788002, str::stream() << "Unknown argument to " << name << " " << fieldName);
+ }
+ }
+
+ // Make sure needed arguments were found.
+ if constexpr (single) {
+ n = ExpressionConstant::create(expCtx, Value(1));
+ } else {
+ uassert(
+ 5788003, str::stream() << "Missing value for '" << AccumulatorN::kFieldNameN << "'", n);
+ }
+ uassert(5788004,
+ str::stream() << "Missing value for '" << AccumulatorN::kFieldNameOutput << "'",
+ output);
+ if (needSortBy) {
+ uassert(5788005,
+ str::stream() << "Missing value for '" << AccumulatorN::kFieldNameSortBy << "'",
+ sortBy);
+ }
+
+ return {n, *output, sortBy};
+}
+
+template <TopBottomSense sense, bool single>
+AccumulatorTopBottomN<sense, single>::AccumulatorTopBottomN(ExpressionContext* const expCtx,
+ SortPattern sp)
+ : AccumulatorN(expCtx), _sortPattern(sp) {
+
+ // Modify sortPattern to sort based on fields where they are in the evaluated argument instead
+ // of where they would be in the raw document received by $group and friends.
+ std::vector<SortPattern::SortPatternPart> parts;
+ int sortOrder = 0;
+ for (auto part : _sortPattern) {
+ const auto newFieldName =
+ (StringBuilder() << AccumulatorN::kFieldNameSortFields << "." << sortOrder).str();
+ part.fieldPath.reset(FieldPath(newFieldName));
+
+ // TODO SERVER-60781 will change AccumulatorTopBottomN so it has different behavior
+ // Invert sort spec if $topN/top.
+ if constexpr (sense == TopBottomSense::kTop) {
+ // $topN usually flips sort pattern by making ascending false. for the case of textScore
+ // based sorting, there is no way to sort by least relevent in a normal mongodb sort
+ // specification so topN still returns the same order as bottomN (most relevent first).
+ if (!part.expression) {
+ part.isAscending = !part.isAscending;
+ }
+ }
+ if (part.expression) {
+ // $meta based sorting is handled earlier in the sortFields expression. See comment in
+ // parseAccumulatorTopBottomNSortBy().
+ part.expression = nullptr;
+ }
+ parts.push_back(part);
+ sortOrder++;
+ }
+ SortPattern internalSortPattern(parts);
+
+ _sortKeyComparator.emplace(internalSortPattern);
+ _sortKeyGenerator.emplace(std::move(internalSortPattern), expCtx->getCollator());
+
+ _memUsageBytes = sizeof(*this);
+
+ // STL expects a less-than function not a 3-way compare function so this lambda wraps
+ // SortKeyComparator.
+ _map.emplace([&, this](const Value& lhs, const Value& rhs) {
+ return (*this->_sortKeyComparator)(lhs, rhs) < 0;
+ });
+}
+
+template <TopBottomSense sense, bool single>
+const char* AccumulatorTopBottomN<sense, single>::getOpName() const {
+ return AccumulatorTopBottomN<sense, single>::getName().rawData();
+}
+
+template <TopBottomSense sense, bool single>
+Document AccumulatorTopBottomN<sense, single>::serialize(
+ boost::intrusive_ptr<Expression> initializer,
+ boost::intrusive_ptr<Expression> argument,
+ bool explain) const {
+ MutableDocument args;
+ if constexpr (!single) {
+ args.addField(kFieldNameN, Value(initializer->serialize(explain)));
+ }
+ auto output = argument->serialize(explain)[kFieldNameOutput];
+ tassert(5788000,
+ str::stream() << "expected argument expression to have " << kFieldNameOutput
+ << " field",
+ !output.missing());
+ args.addField(kFieldNameOutput, Value(output));
+ args.addField(kFieldNameSortBy,
+ Value(_sortPattern.serialize(
+ SortPattern::SortKeySerialization::kForPipelineSerialization)));
+ return DOC(getOpName() << args.freeze());
+}
+
+template <TopBottomSense sense>
+std::pair<SortPattern, BSONArray> parseAccumulatorTopBottomNSortBy(ExpressionContext* const expCtx,
+ BSONObj sortBy) {
+ SortPattern sortPattern(sortBy, expCtx);
+ BSONArrayBuilder sortFieldsExpBab;
+ BSONObjIterator sortByBoi(sortBy);
+ int sortOrder = 0;
+ for (const auto& part : sortPattern) {
+ const auto fieldName = sortByBoi.next().fieldNameStringData();
+ const auto newFieldName =
+ (StringBuilder() << AccumulatorN::kFieldNameSortFields << "." << sortOrder).str();
+
+ if (part.expression) {
+ // In a scenario where we are sorting by metadata (for example if sortBy is
+ // {text: {$meta: "textScore"}}) we cant use ["$text"] as the sortFields expression
+ // since the evaluated argument wouldn't have the same metadata as the original
+ // document. Instead we use [{$meta: "textScore"}] as the sortFields expression so the
+ // sortFields array contains the data we need for sorting.
+ const auto serialized = part.expression->serialize(false);
+ sortFieldsExpBab.append(serialized.getDocument().toBson());
+ } else {
+ sortFieldsExpBab.append((StringBuilder() << "$" << fieldName).str());
+ }
+ sortOrder++;
+ }
+ return {sortPattern, sortFieldsExpBab.arr()};
+}
+
+template <TopBottomSense sense, bool single>
+AccumulationExpression AccumulatorTopBottomN<sense, single>::parseTopBottomN(
+ ExpressionContext* const expCtx, BSONElement elem, VariablesParseState vps) {
+ auto name = AccumulatorTopBottomN<sense, single>::getName();
+
+ const auto [n, output, sortBy] =
+ accumulatorNParseArgs<single>(expCtx, elem, name.rawData(), true, vps);
+
+ auto [sortPattern, sortFieldsExp] = parseAccumulatorTopBottomNSortBy<sense>(expCtx, *sortBy);
+
+ // Construct argument expression. If given sortBy: {field1: 1, field2: 1} it will be shaped like
+ // {output: <output expression>, sortFields: ["$field1", "$field2"]}. This projects out only the
+ // fields we need for sorting so we can use SortKeyComparator without copying the entire
+ // document. This argument expression will be evaluated and become the input to processValue.
+ boost::intrusive_ptr<Expression> argument = Expression::parseObject(
+ expCtx, BSON(output << AccumulatorN::kFieldNameSortFields << sortFieldsExp), vps);
+
+ auto factory = [expCtx, sortPattern = std::move(sortPattern)] {
+ return make_intrusive<AccumulatorTopBottomN<sense, single>>(expCtx, sortPattern);
+ };
+
+ return {std::move(n), std::move(argument), std::move(factory), name};
+}
+
+template <TopBottomSense sense, bool single>
+boost::intrusive_ptr<AccumulatorState> AccumulatorTopBottomN<sense, single>::create(
+ ExpressionContext* expCtx, BSONObj sortBy) {
+ return make_intrusive<AccumulatorTopBottomN<sense, single>>(
+ expCtx, parseAccumulatorTopBottomNSortBy<sense>(expCtx, sortBy).first);
+}
+
+template <TopBottomSense sense, bool single>
+void AccumulatorTopBottomN<sense, single>::processValue(const Value& val) {
+ tassert(5788014,
+ str::stream() << "processValue of " << getName() << "should have recieved an object",
+ val.isObject());
+
+ Value output = val[AccumulatorN::kFieldNameOutput];
+ Value sortKey;
+
+ // In the case that processValue() is getting called in the context of merging, a previous
+ // processValue has already generated the sortKey for us, so we don't need to regenerate it.
+ Value generatedSortKey = val[kFieldNameGeneratedSortKey];
+ if (!generatedSortKey.missing()) {
+ sortKey = generatedSortKey;
+ } else {
+ sortKey = _sortKeyGenerator->computeSortKeyFromDocument(val.getDocument());
+ }
+ KeyOutPair keyOutPair(sortKey, output);
+
+ // Only compare if we have 'n' elements.
+ if (static_cast<long long>(_map->size()) == *_n) {
+ // Get an iterator to the element we want to compare against.
+ auto cmpElem = std::prev(_map->end());
+
+ // TODO SERVER-60781 will change AccumulatorTopBottomN so it has different behavior. $topN
+ // will insert items greater than the min and $bottomN will insert items less than the max.
+ auto cmp = (*_sortKeyComparator)(cmpElem->first, keyOutPair.first);
+ // When the sort key produces a tie we keep the first value seen.
+ if (cmp > 0) {
+ _memUsageBytes -= cmpElem->first.getApproximateSize() +
+ cmpElem->second.getApproximateSize() + sizeof(KeyOutPair);
+ _map->erase(cmpElem);
+ } else {
+ return;
+ }
+ }
+ _memUsageBytes +=
+ sortKey.getApproximateSize() + output.getApproximateSize() + sizeof(KeyOutPair);
+ uassert(ErrorCodes::ExceededMemoryLimit,
+ str::stream() << getOpName()
+ << " used too much memory and cannot spill to disk. Memory limit: "
+ << _maxMemUsageBytes << " bytes",
+ _memUsageBytes < _maxMemUsageBytes);
+ _map->emplace(keyOutPair);
+}
+
+template <TopBottomSense sense, bool single>
+Value AccumulatorTopBottomN<sense, single>::getValue(bool toBeMerged) {
+ std::vector<Value> result;
+ for (const auto& keyOutPair : *_map) {
+ if (toBeMerged) {
+ result.emplace_back(BSON(kFieldNameGeneratedSortKey
+ << keyOutPair.first << kFieldNameOutput << keyOutPair.second));
+ } else {
+ result.push_back(keyOutPair.second);
+ }
+ };
+
+ if constexpr (!single) {
+ return Value(result);
+ } else {
+ tassert(5788015,
+ str::stream() << getName() << " group did not contain exactly one value",
+ result.size() == 1);
+ if (toBeMerged) {
+ return Value(result);
+ } else {
+ return Value(result[0]);
+ }
+ }
+}
+
+template <TopBottomSense sense, bool single>
+void AccumulatorTopBottomN<sense, single>::reset() {
+ _map->clear();
+ _memUsageBytes = sizeof(*this);
+}
+
+// Explicitly specify the following classes should generated and should live in this compilation
+// unit.
+template class AccumulatorTopBottomN<TopBottomSense::kBottom, false>;
+template class AccumulatorTopBottomN<TopBottomSense::kBottom, true>;
+template class AccumulatorTopBottomN<TopBottomSense::kTop, false>;
+template class AccumulatorTopBottomN<TopBottomSense::kTop, true>;
+
} // namespace mongo