diff options
author | David Storch <david.storch@mongodb.com> | 2022-12-08 16:07:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-15 23:44:05 +0000 |
commit | 30ddfe887b07bbde3ded94d0d909fc6c2f716001 (patch) | |
tree | d508b3ec2b7139fbc188400d2ae4dc4b65680b16 | |
parent | 88e851c06ecfd3182627ec68c2d03ca4de294cab (diff) | |
download | mongo-30ddfe887b07bbde3ded94d0d909fc6c2f716001.tar.gz |
SERVER-70395 Combining partial aggs support for $addToSet, $push, and $mergeObjects
(cherry picked from commit ed83b1244ec4d89c909e5766893c06de091fabb6)
-rw-r--r-- | src/mongo/db/exec/sbe/expressions/expression.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/values/value.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/values/value.h | 9 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/vm.cpp | 222 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/vm.h | 17 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_accumulator.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp | 236 |
7 files changed, 556 insertions, 16 deletions
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index 3a2b20a4657..a145598592c 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -468,6 +468,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { {"tanh", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::tanh, false}}, {"round", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::round, false}}, {"concat", BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::concat, false}}, + {"aggConcatArraysCapped", + BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggConcatArraysCapped, true}}, {"isMember", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::isMember, false}}, {"collIsMember", BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collIsMember, false}}, {"indexOfBytes", @@ -486,6 +488,10 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { BuiltinFn{[](size_t n) { return n >= 1; }, vm::Builtin::collSetIntersection, false}}, {"collSetDifference", BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collSetDifference, false}}, + {"aggSetUnionCapped", + BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggSetUnionCapped, true}}, + {"aggCollSetUnionCapped", + BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::aggCollSetUnionCapped, true}}, {"runJsPredicate", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::runJsPredicate, false}}, {"regexCompile", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::regexCompile, false}}, diff --git a/src/mongo/db/exec/sbe/values/value.cpp b/src/mongo/db/exec/sbe/values/value.cpp index 6c21f4f6e5d..e0f73c87ddf 100644 --- a/src/mongo/db/exec/sbe/values/value.cpp +++ b/src/mongo/db/exec/sbe/values/value.cpp @@ -860,7 +860,7 @@ bool isInfinity(TypeTags tag, Value val) { (tag == TypeTags::NumberDecimal && bitcastTo<Decimal128>(val).isInfinite()); } -void ArraySet::push_back(TypeTags tag, Value val) { +bool ArraySet::push_back(TypeTags tag, Value val) { if (tag != TypeTags::Nothing) { ValueGuard guard{tag, val}; auto [it, inserted] = _values.insert({tag, val}); @@ -868,7 +868,11 @@ void ArraySet::push_back(TypeTags tag, Value val) { if (inserted) { guard.reset(); } + + return inserted; } + + return false; } std::pair<TypeTags, Value> ArrayEnumerator::getViewOfValue() const { diff --git a/src/mongo/db/exec/sbe/values/value.h b/src/mongo/db/exec/sbe/values/value.h index 26ba3e5e1de..ae207106d8b 100644 --- a/src/mongo/db/exec/sbe/values/value.h +++ b/src/mongo/db/exec/sbe/values/value.h @@ -844,7 +844,14 @@ public: } } - void push_back(TypeTags tag, Value val); + /** + * Adds the given SBE value to the set if an equal value is not already present. Assumes + * ownership of the given value. + * + * Returns true if the value was newly inserted, otherwise returns false to indicate that an + * equal value was already present in the set. + */ + bool push_back(TypeTags tag, Value val); auto& values() const noexcept { return _values; diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp index 7eb8eb7149e..f8b6e394ab4 100644 --- a/src/mongo/db/exec/sbe/vm/vm.cpp +++ b/src/mongo/db/exec/sbe/vm/vm.cpp @@ -3019,6 +3019,97 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinConcat(ArityTyp return {true, strTag, strValue}; } +std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggConcatArraysCapped( + ArityType arity) { + auto [ownArr, tagArr, valArr] = getFromStack(0); + auto [tagNewElem, valNewElem] = moveOwnedFromStack(1); + value::ValueGuard guardNewElem{tagNewElem, valNewElem}; + auto [_, tagSizeCap, valSizeCap] = getFromStack(2); + + tassert(7039508, + "'cap' parameter must be a 32-bit int", + tagSizeCap == value::TypeTags::NumberInt32); + const int32_t sizeCap = value::bitcastTo<int32_t>(valSizeCap); + + // We expect the new value we are adding to the accumulator to be a two-element array where + // the first element is the array to concatenate and the second value is the corresponding size. + tassert(7039512, "expected value of type 'Array'", tagNewElem == value::TypeTags::Array); + auto newArr = value::getArrayView(valNewElem); + tassert(7039527, + "array had unexpected size", + newArr->size() == static_cast<size_t>(AggArrayWithSize::kLast)); + + // Create a new array to hold size and added elements, if is it does not exist yet. + if (tagArr == value::TypeTags::Nothing) { + ownArr = true; + std::tie(tagArr, valArr) = value::makeNewArray(); + auto arr = value::getArrayView(valArr); + + auto [tagAccArr, valAccArr] = value::makeNewArray(); + + // The order is important! The accumulated array should be at index + // AggArrayWithSize::kValues, and the size should be at index + // AggArrayWithSize::kSizeOfValues. + arr->push_back(tagAccArr, valAccArr); + arr->push_back(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(0)); + } else { + // Take ownership of the accumulator. + topStack(false, value::TypeTags::Nothing, 0); + } + + tassert(7039513, "expected array to be owned", ownArr); + value::ValueGuard accumulatorGuard{tagArr, valArr}; + tassert(7039514, "expected accumulator to have type 'Array'", tagArr == value::TypeTags::Array); + auto arr = value::getArrayView(valArr); + tassert(7039515, + "accumulator was array of unexpected size", + arr->size() == static_cast<size_t>(AggArrayWithSize::kLast)); + + // Check that the accumulated size after concatentation won't exceed the limit. + { + auto [tagAccSize, valAccSize] = + arr->getAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues)); + auto [tagNewSize, valNewSize] = + newArr->getAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues)); + tassert(7039516, "expected 64-bit int", tagAccSize == value::TypeTags::NumberInt64); + tassert(7039517, "expected 64-bit int", tagNewSize == value::TypeTags::NumberInt64); + const int64_t currentSize = value::bitcastTo<int64_t>(valAccSize); + const int64_t newSize = value::bitcastTo<int64_t>(valNewSize); + const int64_t totalSize = currentSize + newSize; + + if (totalSize >= static_cast<int64_t>(sizeCap)) { + uasserted(ErrorCodes::ExceededMemoryLimit, + str::stream() << "Used too much memory for a single array. Memory limit: " + << sizeCap << ". Concatentating array of " << arr->size() + << " elements and " << currentSize << " bytes with array of " + << newArr->size() << " elements and " << newSize << " bytes."); + } + + // We are still under the size limit. Set the new total size in the accumulator. + arr->setAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues), + value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(totalSize)); + } + + auto [tagAccArr, valAccArr] = arr->getAt(static_cast<size_t>(AggArrayWithSize::kValues)); + tassert(7039518, "expected value of type 'Array'", tagAccArr == value::TypeTags::Array); + auto accArr = value::getArrayView(valAccArr); + + auto [tagNewArray, valNewArray] = newArr->getAt(static_cast<size_t>(AggArrayWithSize::kValues)); + tassert(7039519, "expected value of type 'Array'", tagNewArray == value::TypeTags::Array); + + for (auto i = value::ArrayEnumerator{tagNewArray, valNewArray}; !i.atEnd(); i.advance()) { + auto [elTag, elVal] = i.getViewOfValue(); + // TODO SERVER-71952: Since 'valNewArray' is owned here, in the future we could avoid this + // copy by moving the element out of the array. + auto [copyTag, copyVal] = value::copyValue(elTag, elVal); + accArr->push_back(copyTag, copyVal); + } + + accumulatorGuard.reset(); + return {ownArr, tagArr, valArr}; +} + std::pair<value::TypeTags, value::Value> ByteCode::genericIsMember(value::TypeTags lhsTag, value::Value lhsVal, value::TypeTags rhsTag, @@ -3396,6 +3487,131 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinSetUnion(ArityT return setUnion(argTags, argVals); } +std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggSetUnionCappedImpl( + value::TypeTags tagNewElem, + value::Value valNewElem, + int32_t sizeCap, + CollatorInterface* collator) { + value::ValueGuard guardNewElem{tagNewElem, valNewElem}; + auto [ownAcc, tagAcc, valAcc] = getFromStack(0); + + // We expect the new value we are adding to the accumulator to be a two-element array where + // the first element is the new set of values and the second value is the corresponding size. + tassert(7039526, "expected value of type 'Array'", tagNewElem == value::TypeTags::Array); + auto newArr = value::getArrayView(valNewElem); + tassert(7039528, + "array had unexpected size", + newArr->size() == static_cast<size_t>(AggArrayWithSize::kLast)); + + // Create a new array is it does not exist yet. + if (tagAcc == value::TypeTags::Nothing) { + ownAcc = true; + std::tie(tagAcc, valAcc) = value::makeNewArray(); + auto accArray = value::getArrayView(valAcc); + + auto [tagAccSet, valAccSet] = value::makeNewArraySet(collator); + + // The order is important! The accumulated array should be at index + // AggArrayWithSize::kValues, and the size should be at index + // AggArrayWithSize::kSizeOfValues. + accArray->push_back(tagAccSet, valAccSet); + accArray->push_back(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(0)); + } else { + // Take ownership of the accumulator. + topStack(false, value::TypeTags::Nothing, 0); + } + + tassert(7039520, "expected accumulator value to be owned", ownAcc); + value::ValueGuard guardArr{tagAcc, valAcc}; + + tassert( + 7039521, "expected accumulator to be of type 'Array'", tagAcc == value::TypeTags::Array); + auto accArray = value::getArrayView(valAcc); + tassert(7039522, + "array had unexpected size", + accArray->size() == static_cast<size_t>(AggArrayWithSize::kLast)); + + auto [tagAccArrSet, valAccArrSet] = + accArray->getAt(static_cast<size_t>(AggArrayWithSize::kValues)); + tassert( + 7039523, "expected value of type 'ArraySet'", tagAccArrSet == value::TypeTags::ArraySet); + auto accArrSet = value::getArraySetView(valAccArrSet); + + // Extract the current size of the accumulator. As we add elements to the set, we will increment + // the current size accordingly and throw an exception if we ever exceed the size limit. We + // cannot simply sum the two sizes, since the two sets could have a substantial intersection. + auto [tagAccSize, valAccSize] = + accArray->getAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues)); + tassert(7039524, "expected 64-bit int", tagAccSize == value::TypeTags::NumberInt64); + int64_t currentSize = value::bitcastTo<int64_t>(valAccSize); + + auto [tagNewValSet, valNewValSet] = + newArr->getAt(static_cast<size_t>(AggArrayWithSize::kValues)); + tassert( + 7039525, "expected value of type 'ArraySet'", tagNewValSet == value::TypeTags::ArraySet); + + for (auto i = value::ArrayEnumerator{tagNewValSet, valNewValSet}; !i.atEnd(); i.advance()) { + auto [elTag, elVal] = i.getViewOfValue(); + int elemSize = value::getApproximateSize(elTag, elVal); + // TODO SERVER-71952: Since 'valNewValSet' is owned here, in the future we could avoid this + // copy by moving the element out of the array. + auto [copyTag, copyVal] = value::copyValue(elTag, elVal); + bool inserted = accArrSet->push_back(copyTag, copyVal); + + if (inserted) { + currentSize += elemSize; + if (currentSize >= static_cast<int64_t>(sizeCap)) { + uasserted(ErrorCodes::ExceededMemoryLimit, + str::stream() << "Used too much memory for a single array. Memory limit: " + << sizeCap << ". Current set has " << accArrSet->size() + << " elements and is " << currentSize << " bytes."); + } + } + } + + // Update the accumulator with the new total size. + accArray->setAt(static_cast<size_t>(AggArrayWithSize::kSizeOfValues), + value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(currentSize)); + + guardArr.reset(); + return {ownAcc, tagAcc, valAcc}; +} + +std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnionCapped( + ArityType arity) { + auto [tagNewElem, valNewElem] = moveOwnedFromStack(1); + value::ValueGuard guardNewElem{tagNewElem, valNewElem}; + + auto [_, tagSizeCap, valSizeCap] = getFromStack(2); + tassert(7039509, + "'cap' parameter must be a 32-bit int", + tagSizeCap == value::TypeTags::NumberInt32); + const size_t sizeCap = value::bitcastTo<int32_t>(valSizeCap); + + guardNewElem.reset(); + return aggSetUnionCappedImpl(tagNewElem, valNewElem, sizeCap, nullptr /*collator*/); +} + +std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggCollSetUnionCapped( + ArityType arity) { + auto [_1, tagColl, valColl] = getFromStack(1); + auto [tagNewElem, valNewElem] = moveOwnedFromStack(2); + value::ValueGuard guardNewElem{tagNewElem, valNewElem}; + auto [_2, tagSizeCap, valSizeCap] = getFromStack(3); + + tassert(7039510, "expected value of type 'collator'", tagColl == value::TypeTags::collator); + tassert(7039511, + "'cap' parameter must be a 32-bit int", + tagSizeCap == value::TypeTags::NumberInt32); + + guardNewElem.reset(); + return aggSetUnionCappedImpl(tagNewElem, + valNewElem, + value::bitcastTo<int32_t>(valSizeCap), + value::getCollatorView(valColl)); +} + std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinCollSetIntersection( ArityType arity) { invariant(arity >= 1); @@ -4445,6 +4661,12 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti return builtinRound(arity); case Builtin::concat: return builtinConcat(arity); + case Builtin::aggConcatArraysCapped: + return builtinAggConcatArraysCapped(arity); + case Builtin::aggSetUnionCapped: + return builtinAggSetUnionCapped(arity); + case Builtin::aggCollSetUnionCapped: + return builtinAggCollSetUnionCapped(arity); case Builtin::isMember: return builtinIsMember(arity); case Builtin::collIsMember: diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h index 255a1a497c2..fd45b6beaae 100644 --- a/src/mongo/db/exec/sbe/vm/vm.h +++ b/src/mongo/db/exec/sbe/vm/vm.h @@ -527,6 +527,15 @@ enum class Builtin : uint8_t { toLower, coerceToString, concat, + + // Agg function to concatenate arrays, failing when the accumulator reaches a specified size. + aggConcatArraysCapped, + + // Agg functions to compute the set union of two arrays, failing when the accumulator reaches a + // specified size. + aggSetUnionCapped, + aggCollSetUnionCapped, + acos, acosh, asin, @@ -1137,6 +1146,14 @@ private: std::tuple<bool, value::TypeTags, value::Value> builtinTanh(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinRound(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinConcat(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> builtinAggConcatArraysCapped(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> builtinAggSetUnionCapped(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> builtinAggCollSetUnionCapped(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> aggSetUnionCappedImpl( + value::TypeTags tagNewElem, + value::Value valNewElem, + int32_t sizeCap, + CollatorInterface* collator); std::tuple<bool, value::TypeTags, value::Value> builtinIsMember(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinCollIsMember(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinIndexOfBytes(ArityType arity); diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp index 02ae894da6c..56f39a3758d 100644 --- a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp +++ b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp @@ -364,28 +364,50 @@ std::unique_ptr<sbe::EExpression> buildFinalizeSum(StageBuilderState& state, } } -std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAddToSet( - const AccumulationExpression& expr, +std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAddToSetHelper( std::unique_ptr<sbe::EExpression> arg, + StringData funcName, boost::optional<sbe::value::SlotId> collatorSlot, - sbe::value::FrameIdGenerator& frameIdGenerator) { + StringData funcNameWithCollator) { std::vector<std::unique_ptr<sbe::EExpression>> aggs; const int cap = internalQueryMaxAddToSetBytes.load(); if (collatorSlot) { aggs.push_back(makeFunction( - "collAddToSetCapped"_sd, + funcNameWithCollator, sbe::makeE<sbe::EVariable>(*collatorSlot), std::move(arg), makeConstant(sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(cap)))); } else { aggs.push_back(makeFunction( - "addToSetCapped", + funcName, std::move(arg), makeConstant(sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(cap)))); } return aggs; } +std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAddToSet( + const AccumulationExpression& expr, + std::unique_ptr<sbe::EExpression> arg, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + return buildAccumulatorAddToSetHelper( + std::move(arg), "addToSetCapped"_sd, collatorSlot, "collAddToSetCapped"_sd); +} + +std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsAddToSet( + const AccumulationExpression& expr, + const sbe::value::SlotVector& inputSlots, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + tassert(7039506, + "partial agg combiner for $addToSet should have exactly one input slot", + inputSlots.size() == 1); + auto arg = makeVariable(inputSlots[0]); + return buildAccumulatorAddToSetHelper( + std::move(arg), "aggSetUnionCapped"_sd, collatorSlot, "aggCollSetUnionCapped"_sd); +} + std::unique_ptr<sbe::EExpression> buildFinalizeCappedAccumulator( StageBuilderState& state, const AccumulationExpression& expr, @@ -407,20 +429,37 @@ std::unique_ptr<sbe::EExpression> buildFinalizeCappedAccumulator( return pushFinalize; } -std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorPush( - const AccumulationExpression& expr, - std::unique_ptr<sbe::EExpression> arg, - boost::optional<sbe::value::SlotId> collatorSlot, - sbe::value::FrameIdGenerator& frameIdGenerator) { +std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorPushHelper( + std::unique_ptr<sbe::EExpression> arg, StringData aggFuncName) { const int cap = internalQueryMaxPushBytes.load(); std::vector<std::unique_ptr<sbe::EExpression>> aggs; aggs.push_back(makeFunction( - "addToArrayCapped"_sd, + aggFuncName, std::move(arg), makeConstant(sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(cap)))); return aggs; } +std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorPush( + const AccumulationExpression& expr, + std::unique_ptr<sbe::EExpression> arg, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + return buildAccumulatorPushHelper(std::move(arg), "addToArrayCapped"_sd); +} + +std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsPush( + const AccumulationExpression& expr, + const sbe::value::SlotVector& inputSlots, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + tassert(7039505, + "partial agg combiner for $push should have exactly one input slot", + inputSlots.size() == 1); + auto arg = makeVariable(inputSlots[0]); + return buildAccumulatorPushHelper(std::move(arg), "aggConcatArraysCapped"_sd); +} + std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorStdDev( const AccumulationExpression& expr, std::unique_ptr<sbe::EExpression> arg, @@ -517,6 +556,18 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorMergeObjects( aggs.push_back(std::move(filterExpr)); return aggs; } + +std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsMergeObjects( + const AccumulationExpression& expr, + const sbe::value::SlotVector& inputSlots, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + tassert(7039507, + "partial agg combiner for $mergeObjects should have exactly one input slot", + inputSlots.size() == 1); + auto arg = makeVariable(inputSlots[0]); + return buildAccumulatorMergeObjects(expr, std::move(arg), collatorSlot, frameIdGenerator); +} }; // namespace std::pair<std::unique_ptr<sbe::EExpression>, EvalStage> buildArgument( @@ -579,10 +630,13 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggregates( sbe::value::FrameIdGenerator&)>; static const StringDataMap<BuildAggCombinerFn> kAggCombinerBuilders = { + {AccumulatorAddToSet::kName, &buildCombinePartialAggsAddToSet}, {AccumulatorFirst::kName, &buildCombinePartialAggsFirst}, {AccumulatorLast::kName, &buildCombinePartialAggsLast}, {AccumulatorMax::kName, &buildCombinePartialAggsMax}, + {AccumulatorMergeObjects::kName, &buildCombinePartialAggsMergeObjects}, {AccumulatorMin::kName, &buildCombinePartialAggsMin}, + {AccumulatorPush::kName, &buildCombinePartialAggsPush}, }; auto accExprName = acc.expr.name; diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp index 77d0ae24951..23d90b7ca31 100644 --- a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp +++ b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp @@ -27,17 +27,22 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + #include "mongo/platform/basic.h" #include <fmt/printf.h> #include "mongo/db/exec/sbe/expression_test_base.h" +#include "mongo/db/exec/sbe/values/value_printer.h" #include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/query/query_solution.h" #include "mongo/db/query/sbe_stage_builder_accumulator.h" #include "mongo/db/query/sbe_stage_builder_test_fixture.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/logv2/log.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -1715,13 +1720,30 @@ public: void aggregateAndAssertResults(BSONArray inputs, BSONArray expected, const sbe::vm::CodeFragment* code) { + auto [inputTag, inputVal] = makeArray(inputs); + auto [expectedTag, expectedVal] = makeArray(expected); + return aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, code); + } + + /** + * Verifies that executing the bytecode ('code') for combining partial aggregates for $group + * spilling produces the 'expectedVal' outputs given 'inputsVal'. Assumes ownership of both + * 'expectedVal' and 'inputsVal'. + * + * Identical to the overload above, except the inputs and expected outputs are provided as SBE + * arrays rather than BSON arrays. This is useful if the caller needs to construct input and + * output ways in a special way that cannot be achieved by trivial conversion from BSON. + */ + void aggregateAndAssertResults(sbe::value::TypeTags inputTag, + sbe::value::Value inputVal, + sbe::value::TypeTags expectedTag, + sbe::value::Value expectedVal, + const sbe::vm::CodeFragment* code) { // Make sure we are starting from a clean state. _inputAccessor.reset(); _aggAccessor.reset(); - auto [inputTag, inputVal] = makeArray(inputs); sbe::value::ValueGuard inputGuard{inputTag, inputVal}; - auto [expectedTag, expectedVal] = makeArray(expected); sbe::value::ValueGuard expectedGuard{expectedTag, expectedVal}; sbe::value::ArrayEnumerator inputEnumerator{inputTag, inputVal}; @@ -1756,7 +1778,24 @@ public: auto [compareTag, compareValue] = sbe::value::compareValue( outputTag, outputVal, expectedOutputTag, expectedOutputValue); ASSERT_EQ(compareTag, sbe::value::TypeTags::NumberInt32); - ASSERT_EQ(compareValue, 0); + if (compareValue != 0) { + // The test failed, but dump the actual and expected values to the logs for ease of + // debugging. + str::stream actualBuilder; + auto actualPrinter = makeValuePrinter(actualBuilder); + actualPrinter.writeValueToStream(outputTag, outputVal); + + str::stream expectedBuilder; + auto expectedPrinter = makeValuePrinter(expectedBuilder); + expectedPrinter.writeValueToStream(expectedOutputTag, expectedOutputValue); + + LOGV2(7039529, + "Actual value not equal to expected value", + "actual"_attr = actualBuilder, + "expected"_attr = expectedBuilder); + FAIL("accumulator did not have expected value"); + } + _aggAccessor.reset(true, outputTag, outputVal); inputEnumerator.advance(); @@ -1765,6 +1804,53 @@ public: } /** + * A helper for converting a sequence of accumulator states for $push or $addToSet into the + * corresponding SBE value. + */ + enum class Accumulator { kPush, kAddToSet }; + std::pair<sbe::value::TypeTags, sbe::value::Value> makeArrayAccumVal(BSONArray bsonArray, + Accumulator accumType) { + auto [resultTag, resultVal] = sbe::value::makeNewArray(); + sbe::value::ValueGuard resultGuard{resultTag, resultVal}; + auto resultArr = sbe::value::getArrayView(resultVal); + + for (auto&& elt : bsonArray) { + ASSERT(elt.type() == BSONType::Array); + + BSONObjIterator arrayIt{elt.embeddedObject()}; + ASSERT_TRUE(arrayIt.more()); + auto firstElt = arrayIt.next(); + ASSERT(firstElt.type() == BSONType::Array); + BSONArray partialBsonArr{firstElt.embeddedObject()}; + + ASSERT_TRUE(arrayIt.more()); + auto secondElt = arrayIt.next(); + ASSERT(secondElt.isNumber()); + int64_t size = secondElt.safeNumberLong(); + + ASSERT_FALSE(arrayIt.more()); + + // Each partial aggregate is a two-element array whose first element is the partial + // $push result (itself an array) and whose second element is the size. + auto [partialAggTag, partialAggVal] = sbe::value::makeNewArray(); + auto partialAggArr = sbe::value::getArrayView(partialAggVal); + + auto [pushedValsTag, pushedValsVal] = accumType == Accumulator::kPush + ? makeArray(partialBsonArr) + : makeArraySet(partialBsonArr); + partialAggArr->push_back(pushedValsTag, pushedValsVal); + + partialAggArr->push_back(sbe::value::TypeTags::NumberInt64, + sbe::value::bitcastFrom<int64_t>(size)); + + resultArr->push_back(partialAggTag, partialAggVal); + } + + resultGuard.reset(); + return {resultTag, resultVal}; + } + + /** * Convenience method for producing bytecode which combines partial aggregates for the given * 'AccumulationStatement'. * @@ -1799,6 +1885,12 @@ protected: sbe::value::SlotId _collatorSlotId; private: + template <typename Stream> + sbe::value::ValuePrinter<Stream> makeValuePrinter(Stream& stream) { + return sbe::value::ValuePrinters::make(stream, + sbe::PrintOptions().useTagForAmbiguousValues(true)); + } + BSONObj _accumulationStmtBson; std::unique_ptr<sbe::EExpression> _expr; }; @@ -1904,4 +1996,142 @@ TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsLast) { aggregateAndAssertResults(inputValues, expectedAggStates, compiledExpr.get()); } +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsPush) { + auto accStatement = makeAccumulationStatement("$push"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + auto [inputValuesTag, inputValuesVal] = makeArrayAccumVal( + BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4 << 3) << 10) + << BSON_ARRAY(BSON_ARRAY(2 << 1) << 20) << BSON_ARRAY(BSONArray{} << 0)), + Accumulator::kPush); + auto [expectedTag, expectedVal] = + makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4 << 3) << 10) + << BSON_ARRAY(BSON_ARRAY(5 << 4 << 3 << 2 << 1) << 30) + << BSON_ARRAY(BSON_ARRAY(5 << 4 << 3 << 2 << 1) << 30)), + Accumulator::kPush); + aggregateAndAssertResults( + inputValuesTag, inputValuesVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsPushThrowsWhenExceedingSizeLimit) { + auto accStatement = makeAccumulationStatement("$push"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + // If we inject a very large size, we expect the accumulator to throw. This cap prevents the + // accumulator from consuming too much memory. + const int64_t largeSize = 1000 * 1000 * 1000; + + auto input = makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4) << 3) + << BSON_ARRAY(BSON_ARRAY(2 << 1) << largeSize)), + Accumulator::kPush); + auto expected = makeArrayAccumVal( + BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(5 << 4) << 3) << BSON_ARRAY(BSON_ARRAY("unused") << -1)), + Accumulator::kPush); + ASSERT_THROWS_CODE( + aggregateAndAssertResults( + input.first, input.second, expected.first, expected.second, compiledExpr.get()), + DBException, + ErrorCodes::ExceededMemoryLimit); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsAddToSet) { + auto accStatement = makeAccumulationStatement("$addToSet"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + auto [inputValuesTag, inputValuesVal] = + makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(3 << 4 << 5) << 10) + << BSON_ARRAY(BSON_ARRAY(1 << 3 << 5 << 8) << 20) + << BSON_ARRAY(BSONArray{} << 0)), + Accumulator::kAddToSet); + + // Each SBE value is 8 bytes and its tag is 1 byte. So we expect each unique element's size to + // be calculated as 9 bytes. The sizes from the partial aggregates end up getting ignored, and + // the total size is recalculated, since we cannot predict the size of the set union in advance. + auto [expectedTag, expectedVal] = + makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(3 << 4 << 5) << 27) + << BSON_ARRAY(BSON_ARRAY(1 << 3 << 4 << 5 << 8) << 45) + << BSON_ARRAY(BSON_ARRAY(1 << 3 << 4 << 5 << 8) << 45)), + Accumulator::kAddToSet); + aggregateAndAssertResults( + inputValuesTag, inputValuesVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsAddToSetWithCollation) { + auto accStatement = makeAccumulationStatement("$addToSet"_sd); + + auto exprs = stage_builder::buildCombinePartialAggregates( + accStatement, {_inputSlotId}, {_collatorSlotId}, _frameIdGenerator); + ASSERT_EQ(exprs.size(), 1u); + auto expr = std::move(exprs[0]); + + CollatorInterfaceMock collator{CollatorInterfaceMock::MockType::kToLowerString}; + _collatorAccessor.reset(false, + sbe::value::TypeTags::collator, + sbe::value::bitcastFrom<const CollatorInterface*>(&collator)); + + auto compiledExpr = compileAggExpression(*expr, &_aggAccessor); + + auto [inputValuesTag, inputValuesVal] = + makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY("foo" + << "bar") + << 10) + << BSON_ARRAY(BSON_ARRAY("FOO" + << "BAR" + << "baz") + << 20)), + Accumulator::kAddToSet); + + // These strings end up as big strings copied out of the BSON array, so the size accounts for + // the value itself, the type tag, the 4-byte size of the string, and the string itself. + auto [expectedTag, expectedVal] = + makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY("bar" + << "foo") + << 34) + << BSON_ARRAY(BSON_ARRAY("bar" + << "baz" + << "foo") + << 51)), + Accumulator::kAddToSet); + aggregateAndAssertResults( + inputValuesTag, inputValuesVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, + CombinePartialAggsAddToSetThrowsWhenExceedingSizeLimit) { + RAIIServerParameterControllerForTest queryKnobController("internalQueryMaxAddToSetBytes", 50); + + auto accStatement = makeAccumulationStatement("$addToSet"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + auto input = makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(1 << 2) << 0) + << BSON_ARRAY(BSON_ARRAY(3 << 4 << 5) << 0) + << BSON_ARRAY(BSON_ARRAY(6) << 0)), + Accumulator::kAddToSet); + + auto expected = + makeArrayAccumVal(BSON_ARRAY(BSON_ARRAY(BSON_ARRAY(1 << 2) << 18) + << BSON_ARRAY(BSON_ARRAY(1 << 2 << 3 << 4 << 5) << 45) + << BSON_ARRAY(BSON_ARRAY("unused") << -1)), + Accumulator::kAddToSet); + + ASSERT_THROWS_CODE( + aggregateAndAssertResults( + input.first, input.second, expected.first, expected.second, compiledExpr.get()), + DBException, + ErrorCodes::ExceededMemoryLimit); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsMergeObjects) { + auto accStatement = makeAccumulationStatement("$mergeObjects"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + auto inputValues = BSON_ARRAY(BSONNULL << BSONObj{} << BSON("a" << 1) << BSONNULL << "MISSING" + << BSON("a" << 2 << "b" << 3 << "c" << 4) << BSONObj{}); + auto expectedAggStates = + BSON_ARRAY(BSONObj{} << BSONObj{} << BSON("a" << 1) << BSON("a" << 1) << BSON("a" << 1) + << BSON("a" << 2 << "b" << 3 << "c" << 4) + << BSON("a" << 2 << "b" << 3 << "c" << 4)); + aggregateAndAssertResults(inputValues, expectedAggStates, compiledExpr.get()); +} + } // namespace mongo |