summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2022-12-08 16:07:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-15 23:44:05 +0000
commit30ddfe887b07bbde3ded94d0d909fc6c2f716001 (patch)
treed508b3ec2b7139fbc188400d2ae4dc4b65680b16
parent88e851c06ecfd3182627ec68c2d03ca4de294cab (diff)
downloadmongo-30ddfe887b07bbde3ded94d0d909fc6c2f716001.tar.gz
SERVER-70395 Combining partial aggs support for $addToSet, $push, and $mergeObjects
(cherry picked from commit ed83b1244ec4d89c909e5766893c06de091fabb6)
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.cpp6
-rw-r--r--src/mongo/db/exec/sbe/values/value.cpp6
-rw-r--r--src/mongo/db/exec/sbe/values/value.h9
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.cpp222
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.h17
-rw-r--r--src/mongo/db/query/sbe_stage_builder_accumulator.cpp76
-rw-r--r--src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp236
7 files changed, 556 insertions, 16 deletions
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp
index 3a2b20a4657..a145598592c 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.cpp
+++ b/src/mongo/db/exec/sbe/expressions/expression.cpp
@@ -468,6 +468,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
{"tanh", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::tanh, false}},
{"round", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::round, false}},
{"concat", BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::concat, false}},
+ {"aggConcatArraysCapped",
+ BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggConcatArraysCapped, true}},
{"isMember", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::isMember, false}},
{"collIsMember", BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collIsMember, false}},
{"indexOfBytes",
@@ -486,6 +488,10 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
BuiltinFn{[](size_t n) { return n >= 1; }, vm::Builtin::collSetIntersection, false}},
{"collSetDifference",
BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collSetDifference, false}},
+ {"aggSetUnionCapped",
+ BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggSetUnionCapped, true}},
+ {"aggCollSetUnionCapped",
+ BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::aggCollSetUnionCapped, true}},
{"runJsPredicate",
BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::runJsPredicate, false}},
{"regexCompile", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::regexCompile, false}},
diff --git a/src/mongo/db/exec/sbe/values/value.cpp b/src/mongo/db/exec/sbe/values/value.cpp
index 6c21f4f6e5d..e0f73c87ddf 100644
--- a/src/mongo/db/exec/sbe/values/value.cpp
+++ b/src/mongo/db/exec/sbe/values/value.cpp
@@ -860,7 +860,7 @@ bool isInfinity(TypeTags tag, Value val) {
(tag == TypeTags::NumberDecimal && bitcastTo<Decimal128>(val).isInfinite());
}
-void ArraySet::push_back(TypeTags tag, Value val) {
+bool ArraySet::push_back(TypeTags tag, Value val) {
if (tag != TypeTags::Nothing) {
ValueGuard guard{tag, val};
auto [it, inserted] = _values.insert({tag, val});
@@ -868,7 +868,11 @@ void ArraySet::push_back(TypeTags tag, Value val) {
if (inserted) {
guard.reset();
}
+
+ return inserted;
}
+
+ return false;
}
std::pair<TypeTags, Value> ArrayEnumerator::getViewOfValue() const {
diff --git a/src/mongo/db/exec/sbe/values/value.h b/src/mongo/db/exec/sbe/values/value.h
index 26ba3e5e1de..ae207106d8b 100644
--- a/src/mongo/db/exec/sbe/values/value.h
+++ b/src/mongo/db/exec/sbe/values/value.h
@@ -844,7 +844,14 @@ public:
}
}
- void push_back(TypeTags tag, Value val);
+ /**
+ * Adds the given SBE value to the set if an equal value is not already present. Assumes
+ * ownership of the given value.
+ *
+ * Returns true if the value was newly inserted, otherwise returns false to indicate that an
+ * equal value was already present in the set.
+ */
+ bool push_back(TypeTags tag, Value val);
auto& values() const noexcept {
return _values;
diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp
index 7eb8eb7149e..f8b6e394ab4 100644
--- a/src/mongo/db/exec/sbe/vm/vm.cpp
+++ b/src/mongo/db/exec/sbe/vm/vm.cpp
@@ -3019,6 +3019,97 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinConcat(ArityTyp
return {true, strTag, strValue};
}
+std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggConcatArraysCapped(
+ ArityType arity) {
+ auto [ownArr, tagArr, valArr] = getFromStack(0);
+ auto [tagNewElem, valNewElem] = moveOwnedFromStack(1);
+ value::ValueGuard guardNewElem{tagNewElem, valNewElem};
+ auto [_, tagSizeCap, valSizeCap] = getFromStack(2);
+
+ tassert(7039508,
+ "'cap' parameter must be a 32-bit int",
+ tagSizeCap == value::TypeTags::NumberInt32);
+ const int32_t sizeCap = value::bitcastTo<int32_t>(valSizeCap);
+
+ // We expect the new value we are adding to the accumulator to be a two-element array where
+ // the first element is the array to concatenate and the second value is the corresponding size.
+ tassert(7039512, "expected value of type 'Array'", tagNewElem == value::TypeTags::Array);
+ auto newArr = value::getArrayView(valNewElem);
+ tassert(7039527,
+ "array had unexpected size",
+ newArr->size() == static_cast<size_t>(AggArrayWithSize::kLast));
+
+ // Create a new array to hold size and added elements, if is it does not exist yet.
+ if (tagArr == value::TypeTags::Nothing) {
+ ownArr = true;
+ std::tie(tagArr, valArr) = value::makeNewArray();
+ auto arr = value::getArrayView(valArr);
+
+ auto [tagAccArr, valAccArr] = value::makeNewArray();
+
+ // The order is important! The accumulated array should be at index
+ // AggArrayWithSize::kValues, and the size should be at index
+ // AggArrayWithSize::kSizeOfValues.
+ arr->push_back(tagAccArr, valAccArr);
+ arr->push_back(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(0));
+ } else {
+ // Take ownership of the accumulator.
+ topStack(false, value::TypeTags::Nothing, 0);
+ }
+
+ tassert(7039513, "expected array to be owned", ownArr);
+ value::ValueGuard accumulatorGuard{tagArr, valArr};
+ tassert(7039514, "expected accumulator to have type 'Array'", tagArr == value::TypeTags::Array);
+ auto arr = value::getArrayView(valArr);
+ tassert(7039515,
+ "accumulator was array of unexpected size",
+ arr->size() == static_cast<size_t>(AggArrayWithSize::kLast));
+
+ // Check that the accumulated size after concatentation won't exceed the limit.
+ {
+ auto [tagAccSize, valAccSize] =
+ arr->getAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues));
+ auto [tagNewSize, valNewSize] =
+ newArr->getAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues));
+ tassert(7039516, "expected 64-bit int", tagAccSize == value::TypeTags::NumberInt64);
+ tassert(7039517, "expected 64-bit int", tagNewSize == value::TypeTags::NumberInt64);
+ const int64_t currentSize = value::bitcastTo<int64_t>(valAccSize);
+ const int64_t newSize = value::bitcastTo<int64_t>(valNewSize);
+ const int64_t totalSize = currentSize + newSize;
+
+ if (totalSize >= static_cast<int64_t>(sizeCap)) {
+ uasserted(ErrorCodes::ExceededMemoryLimit,
+ str::stream() << "Used too much memory for a single array. Memory limit: "
+ << sizeCap << ". Concatentating array of " << arr->size()
+ << " elements and " << currentSize << " bytes with array of "
+ << newArr->size() << " elements and " << newSize << " bytes.");
+ }
+
+ // We are still under the size limit. Set the new total size in the accumulator.
+ arr->setAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues),
+ value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(totalSize));
+ }
+
+ auto [tagAccArr, valAccArr] = arr->getAt(static_cast<size_t>(AggArrayWithSize::kValues));
+ tassert(7039518, "expected value of type 'Array'", tagAccArr == value::TypeTags::Array);
+ auto accArr = value::getArrayView(valAccArr);
+
+ auto [tagNewArray, valNewArray] = newArr->getAt(static_cast<size_t>(AggArrayWithSize::kValues));
+ tassert(7039519, "expected value of type 'Array'", tagNewArray == value::TypeTags::Array);
+
+ for (auto i = value::ArrayEnumerator{tagNewArray, valNewArray}; !i.atEnd(); i.advance()) {
+ auto [elTag, elVal] = i.getViewOfValue();
+ // TODO SERVER-71952: Since 'valNewArray' is owned here, in the future we could avoid this
+ // copy by moving the element out of the array.
+ auto [copyTag, copyVal] = value::copyValue(elTag, elVal);
+ accArr->push_back(copyTag, copyVal);
+ }
+
+ accumulatorGuard.reset();
+ return {ownArr, tagArr, valArr};
+}
+
std::pair<value::TypeTags, value::Value> ByteCode::genericIsMember(value::TypeTags lhsTag,
value::Value lhsVal,
value::TypeTags rhsTag,
@@ -3396,6 +3487,131 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinSetUnion(ArityT
return setUnion(argTags, argVals);
}
+std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggSetUnionCappedImpl(
+ value::TypeTags tagNewElem,
+ value::Value valNewElem,
+ int32_t sizeCap,
+ CollatorInterface* collator) {
+ value::ValueGuard guardNewElem{tagNewElem, valNewElem};
+ auto [ownAcc, tagAcc, valAcc] = getFromStack(0);
+
+ // We expect the new value we are adding to the accumulator to be a two-element array where
+ // the first element is the new set of values and the second value is the corresponding size.
+ tassert(7039526, "expected value of type 'Array'", tagNewElem == value::TypeTags::Array);
+ auto newArr = value::getArrayView(valNewElem);
+ tassert(7039528,
+ "array had unexpected size",
+ newArr->size() == static_cast<size_t>(AggArrayWithSize::kLast));
+
+ // Create a new array is it does not exist yet.
+ if (tagAcc == value::TypeTags::Nothing) {
+ ownAcc = true;
+ std::tie(tagAcc, valAcc) = value::makeNewArray();
+ auto accArray = value::getArrayView(valAcc);
+
+ auto [tagAccSet, valAccSet] = value::makeNewArraySet(collator);
+
+ // The order is important! The accumulated array should be at index
+ // AggArrayWithSize::kValues, and the size should be at index
+ // AggArrayWithSize::kSizeOfValues.
+ accArray->push_back(tagAccSet, valAccSet);
+ accArray->push_back(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(0));
+ } else {
+ // Take ownership of the accumulator.
+ topStack(false, value::TypeTags::Nothing, 0);
+ }
+
+ tassert(7039520, "expected accumulator value to be owned", ownAcc);
+ value::ValueGuard guardArr{tagAcc, valAcc};
+
+ tassert(
+ 7039521, "expected accumulator to be of type 'Array'", tagAcc == value::TypeTags::Array);
+ auto accArray = value::getArrayView(valAcc);
+ tassert(7039522,
+ "array had unexpected size",
+ accArray->size() == static_cast<size_t>(AggArrayWithSize::kLast));
+
+ auto [tagAccArrSet, valAccArrSet] =
+ accArray->getAt(static_cast<size_t>(AggArrayWithSize::kValues));
+ tassert(
+ 7039523, "expected value of type 'ArraySet'", tagAccArrSet == value::TypeTags::ArraySet);
+ auto accArrSet = value::getArraySetView(valAccArrSet);
+
+ // Extract the current size of the accumulator. As we add elements to the set, we will increment
+ // the current size accordingly and throw an exception if we ever exceed the size limit. We
+ // cannot simply sum the two sizes, since the two sets could have a substantial intersection.
+ auto [tagAccSize, valAccSize] =
+ accArray->getAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues));
+ tassert(7039524, "expected 64-bit int", tagAccSize == value::TypeTags::NumberInt64);
+ int64_t currentSize = value::bitcastTo<int64_t>(valAccSize);
+
+ auto [tagNewValSet, valNewValSet] =
+ newArr->getAt(static_cast<size_t>(AggArrayWithSize::kValues));
+ tassert(
+ 7039525, "expected value of type 'ArraySet'", tagNewValSet == value::TypeTags::ArraySet);
+
+ for (auto i = value::ArrayEnumerator{tagNewValSet, valNewValSet}; !i.atEnd(); i.advance()) {
+ auto [elTag, elVal] = i.getViewOfValue();
+ int elemSize = value::getApproximateSize(elTag, elVal);
+ // TODO SERVER-71952: Since 'valNewValSet' is owned here, in the future we could avoid this
+ // copy by moving the element out of the array.
+ auto [copyTag, copyVal] = value::copyValue(elTag, elVal);
+ bool inserted = accArrSet->push_back(copyTag, copyVal);
+
+ if (inserted) {
+ currentSize += elemSize;
+ if (currentSize >= static_cast<int64_t>(sizeCap)) {
+ uasserted(ErrorCodes::ExceededMemoryLimit,
+ str::stream() << "Used too much memory for a single array. Memory limit: "
+ << sizeCap << ". Current set has " << accArrSet->size()
+ << " elements and is " << currentSize << " bytes.");
+ }
+ }
+ }
+
+ // Update the accumulator with the new total size.
+ accArray->setAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues),
+ value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(currentSize));
+
+ guardArr.reset();
+ return {ownAcc, tagAcc, valAcc};
+}
+
+std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnionCapped(
+ ArityType arity) {
+ auto [tagNewElem, valNewElem] = moveOwnedFromStack(1);
+ value::ValueGuard guardNewElem{tagNewElem, valNewElem};
+
+ auto [_, tagSizeCap, valSizeCap] = getFromStack(2);
+ tassert(7039509,
+ "'cap' parameter must be a 32-bit int",
+ tagSizeCap == value::TypeTags::NumberInt32);
+ const size_t sizeCap = value::bitcastTo<int32_t>(valSizeCap);
+
+ guardNewElem.reset();
+ return aggSetUnionCappedImpl(tagNewElem, valNewElem, sizeCap, nullptr /*collator*/);
+}
+
+std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggCollSetUnionCapped(
+ ArityType arity) {
+ auto [_1, tagColl, valColl] = getFromStack(1);
+ auto [tagNewElem, valNewElem] = moveOwnedFromStack(2);
+ value::ValueGuard guardNewElem{tagNewElem, valNewElem};
+ auto [_2, tagSizeCap, valSizeCap] = getFromStack(3);
+
+ tassert(7039510, "expected value of type 'collator'", tagColl == value::TypeTags::collator);
+ tassert(7039511,
+ "'cap' parameter must be a 32-bit int",
+ tagSizeCap == value::TypeTags::NumberInt32);
+
+ guardNewElem.reset();
+ return aggSetUnionCappedImpl(tagNewElem,
+ valNewElem,
+ value::bitcastTo<int32_t>(valSizeCap),
+ value::getCollatorView(valColl));
+}
+
std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinCollSetIntersection(
ArityType arity) {
invariant(arity >= 1);
@@ -4445,6 +4661,12 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti
return builtinRound(arity);
case Builtin::concat:
return builtinConcat(arity);
+ case Builtin::aggConcatArraysCapped:
+ return builtinAggConcatArraysCapped(arity);
+ case Builtin::aggSetUnionCapped:
+ return builtinAggSetUnionCapped(arity);
+ case Builtin::aggCollSetUnionCapped:
+ return builtinAggCollSetUnionCapped(arity);
case Builtin::isMember:
return builtinIsMember(arity);
case Builtin::collIsMember:
diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h
index 255a1a497c2..fd45b6beaae 100644
--- a/src/mongo/db/exec/sbe/vm/vm.h
+++ b/src/mongo/db/exec/sbe/vm/vm.h
@@ -527,6 +527,15 @@ enum class Builtin : uint8_t {
toLower,
coerceToString,
concat,
+
+ // Agg function to concatenate arrays, failing when the accumulator reaches a specified size.
+ aggConcatArraysCapped,
+
+ // Agg functions to compute the set union of two arrays, failing when the accumulator reaches a
+ // specified size.
+ aggSetUnionCapped,
+ aggCollSetUnionCapped,
+
acos,
acosh,
asin,
@@ -1137,6 +1146,14 @@ private:
std::tuple<bool, value::TypeTags, value::Value> builtinTanh(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinRound(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinConcat(ArityType arity);
+ std::tuple<bool, value::TypeTags, value::Value> builtinAggConcatArraysCapped(ArityType arity);
+ std::tuple<bool, value::TypeTags, value::Value> builtinAggSetUnionCapped(ArityType arity);
+ std::tuple<bool, value::TypeTags, value::Value> builtinAggCollSetUnionCapped(ArityType arity);
+ std::tuple<bool, value::TypeTags, value::Value> aggSetUnionCappedImpl(
+ value::TypeTags tagNewElem,
+ value::Value valNewElem,
+ int32_t sizeCap,
+ CollatorInterface* collator);
std::tuple<bool, value::TypeTags, value::Value> builtinIsMember(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinCollIsMember(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinIndexOfBytes(ArityType arity);
diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
index 02ae894da6c..56f39a3758d 100644
--- a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
@@ -364,28 +364,50 @@ std::unique_ptr<sbe::EExpression> buildFinalizeSum(StageBuilderState& state,
}
}
-std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAddToSet(
- const AccumulationExpression& expr,
+std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAddToSetHelper(
std::unique_ptr<sbe::EExpression> arg,
+ StringData funcName,
boost::optional<sbe::value::SlotId> collatorSlot,
- sbe::value::FrameIdGenerator& frameIdGenerator) {
+ StringData funcNameWithCollator) {
std::vector<std::unique_ptr<sbe::EExpression>> aggs;
const int cap = internalQueryMaxAddToSetBytes.load();
if (collatorSlot) {
aggs.push_back(makeFunction(
- "collAddToSetCapped"_sd,
+ funcNameWithCollator,
sbe::makeE<sbe::EVariable>(*collatorSlot),
std::move(arg),
makeConstant(sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(cap))));
} else {
aggs.push_back(makeFunction(
- "addToSetCapped",
+ funcName,
std::move(arg),
makeConstant(sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(cap))));
}
return aggs;
}
+std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAddToSet(
+ const AccumulationExpression& expr,
+ std::unique_ptr<sbe::EExpression> arg,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ return buildAccumulatorAddToSetHelper(
+ std::move(arg), "addToSetCapped"_sd, collatorSlot, "collAddToSetCapped"_sd);
+}
+
+std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsAddToSet(
+ const AccumulationExpression& expr,
+ const sbe::value::SlotVector& inputSlots,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ tassert(7039506,
+ "partial agg combiner for $addToSet should have exactly one input slot",
+ inputSlots.size() == 1);
+ auto arg = makeVariable(inputSlots[0]);
+ return buildAccumulatorAddToSetHelper(
+ std::move(arg), "aggSetUnionCapped"_sd, collatorSlot, "aggCollSetUnionCapped"_sd);
+}
+
std::unique_ptr<sbe::EExpression> buildFinalizeCappedAccumulator(
StageBuilderState& state,
const AccumulationExpression& expr,
@@ -407,20 +429,37 @@ std::unique_ptr<sbe::EExpression> buildFinalizeCappedAccumulator(
return pushFinalize;
}
-std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorPush(
- const AccumulationExpression& expr,
- std::unique_ptr<sbe::EExpression> arg,
- boost::optional<sbe::value::SlotId> collatorSlot,
- sbe::value::FrameIdGenerator& frameIdGenerator) {
+std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorPushHelper(
+ std::unique_ptr<sbe::EExpression> arg, StringData aggFuncName) {
const int cap = internalQueryMaxPushBytes.load();
std::vector<std::unique_ptr<sbe::EExpression>> aggs;
aggs.push_back(makeFunction(
- "addToArrayCapped"_sd,
+ aggFuncName,
std::move(arg),
makeConstant(sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(cap))));
return aggs;
}
+std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorPush(
+ const AccumulationExpression& expr,
+ std::unique_ptr<sbe::EExpression> arg,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ return buildAccumulatorPushHelper(std::move(arg), "addToArrayCapped"_sd);
+}
+
+std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsPush(
+ const AccumulationExpression& expr,
+ const sbe::value::SlotVector& inputSlots,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ tassert(7039505,
+ "partial agg combiner for $push should have exactly one input slot",
+ inputSlots.size() == 1);
+ auto arg = makeVariable(inputSlots[0]);
+ return buildAccumulatorPushHelper(std::move(arg), "aggConcatArraysCapped"_sd);
+}
+
std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorStdDev(
const AccumulationExpression& expr,
std::unique_ptr<sbe::EExpression> arg,
@@ -517,6 +556,18 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorMergeObjects(
aggs.push_back(std::move(filterExpr));
return aggs;
}
+
+std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsMergeObjects(
+ const AccumulationExpression& expr,
+ const sbe::value::SlotVector& inputSlots,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ tassert(7039507,
+ "partial agg combiner for $mergeObjects should have exactly one input slot",
+ inputSlots.size() == 1);
+ auto arg = makeVariable(inputSlots[0]);
+ return buildAccumulatorMergeObjects(expr, std::move(arg), collatorSlot, frameIdGenerator);
+}
}; // namespace
std::pair<std::unique_ptr<sbe::EExpression>, EvalStage> buildArgument(
@@ -579,10 +630,13 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggregates(
sbe::value::FrameIdGenerator&)>;
static const StringDataMap<BuildAggCombinerFn> kAggCombinerBuilders = {
+ {AccumulatorAddToSet::kName, &buildCombinePartialAggsAddToSet},
{AccumulatorFirst::kName, &buildCombinePartialAggsFirst},
{AccumulatorLast::kName, &buildCombinePartialAggsLast},
{AccumulatorMax::kName, &buildCombinePartialAggsMax},
+ {AccumulatorMergeObjects::kName, &buildCombinePartialAggsMergeObjects},
{AccumulatorMin::kName, &buildCombinePartialAggsMin},
+ {AccumulatorPush::kName, &buildCombinePartialAggsPush},
};
auto accExprName = acc.expr.name;
diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp
index 77d0ae24951..23d90b7ca31 100644
--- a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp
@@ -27,17 +27,22 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
#include "mongo/platform/basic.h"
#include <fmt/printf.h>
#include "mongo/db/exec/sbe/expression_test_base.h"
+#include "mongo/db/exec/sbe/values/value_printer.h"
#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/query/query_solution.h"
#include "mongo/db/query/sbe_stage_builder_accumulator.h"
#include "mongo/db/query/sbe_stage_builder_test_fixture.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -1715,13 +1720,30 @@ public:
void aggregateAndAssertResults(BSONArray inputs,
BSONArray expected,
const sbe::vm::CodeFragment* code) {
+ auto [inputTag, inputVal] = makeArray(inputs);
+ auto [expectedTag, expectedVal] = makeArray(expected);
+ return aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, code);
+ }
+
+ /**
+ * Verifies that executing the bytecode ('code') for combining partial aggregates for $group
+ * spilling produces the 'expectedVal' outputs given 'inputsVal'. Assumes ownership of both
+ * 'expectedVal' and 'inputsVal'.
+ *
+ * Identical to the overload above, except the inputs and expected outputs are provided as SBE
+ * arrays rather than BSON arrays. This is useful if the caller needs to construct input and
+ * output ways in a special way that cannot be achieved by trivial conversion from BSON.
+ */
+ void aggregateAndAssertResults(sbe::value::TypeTags inputTag,
+ sbe::value::Value inputVal,
+ sbe::value::TypeTags expectedTag,
+ sbe::value::Value expectedVal,
+ const sbe::vm::CodeFragment* code) {
// Make sure we are starting from a clean state.
_inputAccessor.reset();
_aggAccessor.reset();
- auto [inputTag, inputVal] = makeArray(inputs);
sbe::value::ValueGuard inputGuard{inputTag, inputVal};
- auto [expectedTag, expectedVal] = makeArray(expected);
sbe::value::ValueGuard expectedGuard{expectedTag, expectedVal};
sbe::value::ArrayEnumerator inputEnumerator{inputTag, inputVal};
@@ -1756,7 +1778,24 @@ public:
auto [compareTag, compareValue] = sbe::value::compareValue(
outputTag, outputVal, expectedOutputTag, expectedOutputValue);
ASSERT_EQ(compareTag, sbe::value::TypeTags::NumberInt32);
- ASSERT_EQ(compareValue, 0);
+ if (compareValue != 0) {
+ // The test failed, but dump the actual and expected values to the logs for ease of
+ // debugging.
+ str::stream actualBuilder;
+ auto actualPrinter = makeValuePrinter(actualBuilder);
+ actualPrinter.writeValueToStream(outputTag, outputVal);
+
+ str::stream expectedBuilder;
+ auto expectedPrinter = makeValuePrinter(expectedBuilder);
+ expectedPrinter.writeValueToStream(expectedOutputTag, expectedOutputValue);
+
+ LOGV2(7039529,
+ "Actual value not equal to expected value",
+ "actual"_attr = actualBuilder,
+ "expected"_attr = expectedBuilder);
+ FAIL("accumulator did not have expected value");
+ }
+
_aggAccessor.reset(true, outputTag, outputVal);
inputEnumerator.advance();
@@ -1765,6 +1804,53 @@ public:
}
/**
+ * A helper for converting a sequence of accumulator states for $push or $addToSet into the
+ * corresponding SBE value.
+ */
+ enum class Accumulator { kPush, kAddToSet };
+ std::pair<sbe::value::TypeTags, sbe::value::Value> makeArrayAccumVal(BSONArray bsonArray,
+ Accumulator accumType) {
+ auto [resultTag, resultVal] = sbe::value::makeNewArray();
+ sbe::value::ValueGuard resultGuard{resultTag, resultVal};
+ auto resultArr = sbe::value::getArrayView(resultVal);
+
+ for (auto&& elt : bsonArray) {
+ ASSERT(elt.type() == BSONType::Array);
+
+ BSONObjIterator arrayIt{elt.embeddedObject()};
+ ASSERT_TRUE(arrayIt.more());
+ auto firstElt = arrayIt.next();
+ ASSERT(firstElt.type() == BSONType::Array);
+ BSONArray partialBsonArr{firstElt.embeddedObject()};
+
+ ASSERT_TRUE(arrayIt.more());
+ auto secondElt = arrayIt.next();
+ ASSERT(secondElt.isNumber());
+ int64_t size = secondElt.safeNumberLong();
+
+ ASSERT_FALSE(arrayIt.more());
+
+ // Each partial aggregate is a two-element array whose first element is the partial
+ // $push result (itself an array) and whose second element is the size.
+ auto [partialAggTag, partialAggVal] = sbe::value::makeNewArray();
+ auto partialAggArr = sbe::value::getArrayView(partialAggVal);
+
+ auto [pushedValsTag, pushedValsVal] = accumType == Accumulator::kPush
+ ? makeArray(partialBsonArr)
+ : makeArraySet(partialBsonArr);
+ partialAggArr->push_back(pushedValsTag, pushedValsVal);
+
+ partialAggArr->push_back(sbe::value::TypeTags::NumberInt64,
+ sbe::value::bitcastFrom<int64_t>(size));
+
+ resultArr->push_back(partialAggTag, partialAggVal);
+ }
+
+ resultGuard.reset();
+ return {resultTag, resultVal};
+ }
+
+ /**
* Convenience method for producing bytecode which combines partial aggregates for the given
* 'AccumulationStatement'.
*
@@ -1799,6 +1885,12 @@ protected:
sbe::value::SlotId _collatorSlotId;
private:
+ template <typename Stream>
+ sbe::value::ValuePrinter<Stream> makeValuePrinter(Stream& stream) {
+ return sbe::value::ValuePrinters::make(stream,
+ sbe::PrintOptions().useTagForAmbiguousValues(true));
+ }
+
BSONObj _accumulationStmtBson;
std::unique_ptr<sbe::EExpression> _expr;
};
@@ -1904,4 +1996,142 @@ TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsLast) {
aggregateAndAssertResults(inputValues, expectedAggStates, compiledExpr.get());
}
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsPush) {
+ auto accStatement = makeAccumulationStatement("$push"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ auto [inputValuesTag, inputValuesVal] = makeArrayAccumVal(
+ BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4 << 3) << 10)
+ << BSON_ARRAY(BSON_ARRAY(2 << 1) << 20) << BSON_ARRAY(BSONArray{} << 0)),
+ Accumulator::kPush);
+ auto [expectedTag, expectedVal] =
+ makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4 << 3) << 10)
+ << BSON_ARRAY(BSON_ARRAY(5 << 4 << 3 << 2 << 1) << 30)
+ << BSON_ARRAY(BSON_ARRAY(5 << 4 << 3 << 2 << 1) << 30)),
+ Accumulator::kPush);
+ aggregateAndAssertResults(
+ inputValuesTag, inputValuesVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsPushThrowsWhenExceedingSizeLimit) {
+ auto accStatement = makeAccumulationStatement("$push"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ // If we inject a very large size, we expect the accumulator to throw. This cap prevents the
+ // accumulator from consuming too much memory.
+ const int64_t largeSize = 1000 * 1000 * 1000;
+
+ auto input = makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4) << 3)
+ << BSON_ARRAY(BSON_ARRAY(2 << 1) << largeSize)),
+ Accumulator::kPush);
+ auto expected = makeArrayAccumVal(
+ BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4) << 3) << BSON_ARRAY(BSON_ARRAY("unused") << -1)),
+ Accumulator::kPush);
+ ASSERT_THROWS_CODE(
+ aggregateAndAssertResults(
+ input.first, input.second, expected.first, expected.second, compiledExpr.get()),
+ DBException,
+ ErrorCodes::ExceededMemoryLimit);
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsAddToSet) {
+ auto accStatement = makeAccumulationStatement("$addToSet"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ auto [inputValuesTag, inputValuesVal] =
+ makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(3 << 4 << 5) << 10)
+ << BSON_ARRAY(BSON_ARRAY(1 << 3 << 5 << 8) << 20)
+ << BSON_ARRAY(BSONArray{} << 0)),
+ Accumulator::kAddToSet);
+
+ // Each SBE value is 8 bytes and its tag is 1 byte. So we expect each unique element's size to
+ // be calculated as 9 bytes. The sizes from the partial aggregates end up getting ignored, and
+ // the total size is recalculated, since we cannot predict the size of the set union in advance.
+ auto [expectedTag, expectedVal] =
+ makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(3 << 4 << 5) << 27)
+ << BSON_ARRAY(BSON_ARRAY(1 << 3 << 4 << 5 << 8) << 45)
+ << BSON_ARRAY(BSON_ARRAY(1 << 3 << 4 << 5 << 8) << 45)),
+ Accumulator::kAddToSet);
+ aggregateAndAssertResults(
+ inputValuesTag, inputValuesVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsAddToSetWithCollation) {
+ auto accStatement = makeAccumulationStatement("$addToSet"_sd);
+
+ auto exprs = stage_builder::buildCombinePartialAggregates(
+ accStatement, {_inputSlotId}, {_collatorSlotId}, _frameIdGenerator);
+ ASSERT_EQ(exprs.size(), 1u);
+ auto expr = std::move(exprs[0]);
+
+ CollatorInterfaceMock collator{CollatorInterfaceMock::MockType::kToLowerString};
+ _collatorAccessor.reset(false,
+ sbe::value::TypeTags::collator,
+ sbe::value::bitcastFrom<const CollatorInterface*>(&collator));
+
+ auto compiledExpr = compileAggExpression(*expr, &_aggAccessor);
+
+ auto [inputValuesTag, inputValuesVal] =
+ makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY("foo"
+ << "bar")
+ << 10)
+ << BSON_ARRAY(BSON_ARRAY("FOO"
+ << "BAR"
+ << "baz")
+ << 20)),
+ Accumulator::kAddToSet);
+
+ // These strings end up as big strings copied out of the BSON array, so the size accounts for
+ // the value itself, the type tag, the 4-byte size of the string, and the string itself.
+ auto [expectedTag, expectedVal] =
+ makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY("bar"
+ << "foo")
+ << 34)
+ << BSON_ARRAY(BSON_ARRAY("bar"
+ << "baz"
+ << "foo")
+ << 51)),
+ Accumulator::kAddToSet);
+ aggregateAndAssertResults(
+ inputValuesTag, inputValuesVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest,
+ CombinePartialAggsAddToSetThrowsWhenExceedingSizeLimit) {
+ RAIIServerParameterControllerForTest queryKnobController("internalQueryMaxAddToSetBytes", 50);
+
+ auto accStatement = makeAccumulationStatement("$addToSet"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ auto input = makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(1 << 2) << 0)
+ << BSON_ARRAY(BSON_ARRAY(3 << 4 << 5) << 0)
+ << BSON_ARRAY(BSON_ARRAY(6) << 0)),
+ Accumulator::kAddToSet);
+
+ auto expected =
+ makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(1 << 2) << 18)
+ << BSON_ARRAY(BSON_ARRAY(1 << 2 << 3 << 4 << 5) << 45)
+ << BSON_ARRAY(BSON_ARRAY("unused") << -1)),
+ Accumulator::kAddToSet);
+
+ ASSERT_THROWS_CODE(
+ aggregateAndAssertResults(
+ input.first, input.second, expected.first, expected.second, compiledExpr.get()),
+ DBException,
+ ErrorCodes::ExceededMemoryLimit);
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsMergeObjects) {
+ auto accStatement = makeAccumulationStatement("$mergeObjects"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ auto inputValues = BSON_ARRAY(BSONNULL << BSONObj{} << BSON("a" << 1) << BSONNULL << "MISSING"
+ << BSON("a" << 2 << "b" << 3 << "c" << 4) << BSONObj{});
+ auto expectedAggStates =
+ BSON_ARRAY(BSONObj{} << BSONObj{} << BSON("a" << 1) << BSON("a" << 1) << BSON("a" << 1)
+ << BSON("a" << 2 << "b" << 3 << "c" << 4)
+ << BSON("a" << 2 << "b" << 3 << "c" << 4));
+ aggregateAndAssertResults(inputValues, expectedAggStates, compiledExpr.get());
+}
+
} // namespace mongo