diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/exec/sbe/expressions/expression.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/arith.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/vm.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/vm/vm.h | 45 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_accumulator.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp | 300 |
6 files changed, 525 insertions, 38 deletions
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index a145598592c..96d5d175fa9 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -429,6 +429,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleSum, false}}, {"aggDoubleDoubleSum", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggDoubleDoubleSum, true}}, + {"aggMergeDoubleDoubleSums", + BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggMergeDoubleDoubleSums, true}}, {"doubleDoubleSumFinalize", BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleSumFinalize, false}}, {"doubleDoubleMergeSumFinalize", @@ -436,6 +438,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { {"doubleDoublePartialSumFinalize", BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoublePartialSumFinalize, false}}, {"aggStdDev", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggStdDev, true}}, + {"aggMergeStdDevs", + BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggMergeStdDevs, true}}, {"stdDevPopFinalize", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::stdDevPopFinalize, false}}, {"stdDevSampFinalize", diff --git a/src/mongo/db/exec/sbe/vm/arith.cpp b/src/mongo/db/exec/sbe/vm/arith.cpp index e4c67775ad8..1d41ff59ce1 100644 --- a/src/mongo/db/exec/sbe/vm/arith.cpp +++ b/src/mongo/db/exec/sbe/vm/arith.cpp @@ -503,6 +503,57 @@ void ByteCode::aggDoubleDoubleSumImpl(value::Array* arr, } } +void ByteCode::aggMergeDoubleDoubleSumsImpl(value::Array* accumulator, + value::TypeTags rhsTag, + value::Value rhsValue) { + auto [accumWidestType, _1] = accumulator->getAt(AggSumValueElems::kNonDecimalTotalTag); + + tassert(7039532, "value must be of type 'Array'", rhsTag == value::TypeTags::Array); + auto nextDoubleDoubleArr = value::getArrayView(rhsValue); + + tassert(7039533, + "array does not have enough elements", + nextDoubleDoubleArr->size() >= AggSumValueElems::kMaxSizeOfArray - 1); + + // First aggregate the non-decimal sum, then the non-decimal addend. Both should be doubles. + auto [sumTag, sum] = nextDoubleDoubleArr->getAt(AggSumValueElems::kNonDecimalTotalSum); + tassert(7039534, "expected 'NumberDouble'", sumTag == value::TypeTags::NumberDouble); + aggDoubleDoubleSumImpl(accumulator, sumTag, sum); + + auto [addendTag, addend] = nextDoubleDoubleArr->getAt(AggSumValueElems::kNonDecimalTotalAddend); + tassert(7039535, "expected 'NumberDouble'", addendTag == value::TypeTags::NumberDouble); + // There is a special case when the 'sum' is infinite and the 'addend' is NaN. This DoubleDouble + // value represents infinity, not NaN. Therefore, we avoid incorporating the NaN 'addend' value + // into the sum. + if (std::isfinite(value::bitcastTo<double>(sum)) || + !std::isnan(value::bitcastTo<double>(addend))) { + aggDoubleDoubleSumImpl(accumulator, addendTag, addend); + } + + // Determine the widest non-decimal type that we've seen so far, and set the accumulator state + // accordingly. We do this after computing the sums, since 'aggDoubleDoubleSumImpl()' will + // set the widest type to 'NumberDouble' when we call it above. + auto [newValWidestType, _2] = nextDoubleDoubleArr->getAt(AggSumValueElems::kNonDecimalTotalTag); + tassert( + 7039536, "unexpected 'NumberDecimal'", newValWidestType != value::TypeTags::NumberDecimal); + tassert( + 7039537, "unexpected 'NumberDecimal'", accumWidestType != value::TypeTags::NumberDecimal); + auto widestType = getWidestNumericalType(newValWidestType, accumWidestType); + accumulator->setAt( + AggSumValueElems::kNonDecimalTotalTag, widestType, value::bitcastFrom<int32_t>(0)); + + // If there's a decimal128 sum as part of the incoming DoubleDouble sum, incorporate it into the + // accumulator. + if (nextDoubleDoubleArr->size() == AggSumValueElems::kMaxSizeOfArray) { + auto [decimalTotalTag, decimalTotalVal] = + nextDoubleDoubleArr->getAt(AggSumValueElems::kDecimalTotal); + tassert(7039538, + "The decimalTotal must be 'NumberDecimal'", + decimalTotalTag == TypeTags::NumberDecimal); + aggDoubleDoubleSumImpl(accumulator, decimalTotalTag, decimalTotalVal); + } +} + void ByteCode::aggStdDevImpl(value::Array* arr, value::TypeTags rhsTag, value::Value rhsValue) { if (!isNumber(rhsTag)) { return; @@ -551,6 +602,67 @@ void ByteCode::aggStdDevImpl(value::Array* arr, value::TypeTags rhsTag, value::V return setStdDevArray(newCountVal, newMeanVal, newM2Val, arr); } +void ByteCode::aggMergeStdDevsImpl(value::Array* accumulator, + value::TypeTags rhsTag, + value::Value rhsValue) { + tassert(7039542, "expected value of type 'Array'", rhsTag == value::TypeTags::Array); + auto nextArr = value::getArrayView(rhsValue); + + tassert(7039543, + "expected array to have exactly 3 elements", + accumulator->size() == AggStdDevValueElems::kSizeOfArray); + tassert(7039544, + "expected array to have exactly 3 elements", + nextArr->size() == AggStdDevValueElems::kSizeOfArray); + + auto [newCountTag, newCountVal] = nextArr->getAt(AggStdDevValueElems::kCount); + tassert(7039545, "expected 64-bit int", newCountTag == value::TypeTags::NumberInt64); + int64_t newCount = value::bitcastTo<int64_t>(newCountVal); + + // If the incoming partial aggregate has a count of zero, then it represents the partial + // standard deviation of no data points. This means that it can be safely ignored, and we return + // the accumulator as is. + if (newCount == 0) { + return; + } + + auto [oldCountTag, oldCountVal] = accumulator->getAt(AggStdDevValueElems::kCount); + tassert(7039546, "expected 64-bit int", oldCountTag == value::TypeTags::NumberInt64); + int64_t oldCount = value::bitcastTo<int64_t>(oldCountVal); + + auto [oldMeanTag, oldMeanVal] = accumulator->getAt(AggStdDevValueElems::kRunningMean); + tassert(7039547, "expected double", oldMeanTag == value::TypeTags::NumberDouble); + double oldMean = value::bitcastTo<double>(oldMeanVal); + + auto [newMeanTag, newMeanVal] = nextArr->getAt(AggStdDevValueElems::kRunningMean); + tassert(7039548, "expected double", newMeanTag == value::TypeTags::NumberDouble); + double newMean = value::bitcastTo<double>(newMeanVal); + + auto [oldM2Tag, oldM2Val] = accumulator->getAt(AggStdDevValueElems::kRunningM2); + tassert(7039531, "expected double", oldM2Tag == value::TypeTags::NumberDouble); + double oldM2 = value::bitcastTo<double>(oldM2Val); + + auto [newM2Tag, newM2Val] = nextArr->getAt(AggStdDevValueElems::kRunningM2); + tassert(7039541, "expected double", newM2Tag == value::TypeTags::NumberDouble); + double newM2 = value::bitcastTo<double>(newM2Val); + + const double delta = newMean - oldMean; + // We've already handled the case where 'newCount' is zero above. This means that 'totalCount' + // must be positive, and prevents us from ever dividing by zero in the subsequent calculation. + int64_t totalCount = oldCount + newCount; + if (delta != 0) { + newMean = ((oldCount * oldMean) + (newCount * newMean)) / totalCount; + newM2 += delta * delta * + (static_cast<double>(oldCount) * static_cast<double>(newCount) / totalCount); + } + newM2 += oldM2; + + setStdDevArray(value::bitcastFrom<int64_t>(totalCount), + value::bitcastFrom<double>(newMean), + value::bitcastFrom<double>(newM2), + accumulator); +} + std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggStdDevFinalizeImpl( value::Value fieldValue, bool isSamp) { auto arr = value::getArrayView(fieldValue); diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp index f8b6e394ab4..0b2d83f9f9f 100644 --- a/src/mongo/db/exec/sbe/vm/vm.cpp +++ b/src/mongo/db/exec/sbe/vm/vm.cpp @@ -1072,35 +1072,40 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggSum(value::TypeTags return genericAdd(accTag, accValue, fieldTag, fieldValue); } +template <bool merging> std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggDoubleDoubleSum( ArityType arity) { - auto [_, fieldTag, fieldValue] = getFromStack(1); // Move the incoming accumulator state from the stack. Given that we are now the owner of the // state we are free to do any in-place update as we see fit. auto [accTag, accValue] = moveOwnedFromStack(0); - value::ValueGuard guard{accTag, accValue}; // Initialize the accumulator. if (accTag == value::TypeTags::Nothing) { std::tie(accTag, accValue) = value::makeNewArray(); - value::ValueGuard guard{accTag, accValue}; + value::ValueGuard newArrGuard{accTag, accValue}; auto arr = value::getArrayView(accValue); arr->reserve(AggSumValueElems::kMaxSizeOfArray); - // The order of the following three elements should match to 'AggSumValueElems'. + // The order of the following three elements should match to 'AggSumValueElems'. An absent + // 'kDecimalTotal' element means that we've not seen any decimal value. So, we're not adding + // 'kDecimalTotal' element yet. arr->push_back(value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(0)); arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0)); arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0)); - // The absent 'kDecimalTotal' element means that we've not seen any decimal value. So, we're - // not adding 'kDecimalTotal' element yet. - aggDoubleDoubleSumImpl(arr, fieldTag, fieldValue); - guard.reset(); - return {true, accTag, accValue}; + newArrGuard.reset(); } + + value::ValueGuard guard{accTag, accValue}; tassert(5755317, "The result slot must be Array-typed", accTag == value::TypeTags::Array); + auto accumulator = value::getArrayView(accValue); + + if constexpr (merging) { + aggMergeDoubleDoubleSumsImpl(accumulator, fieldTag, fieldValue); + } else { + aggDoubleDoubleSumImpl(accumulator, fieldTag, fieldValue); + } - aggDoubleDoubleSumImpl(value::getArrayView(accValue), fieldTag, fieldValue); guard.reset(); return {true, accTag, accValue}; } @@ -1235,31 +1240,37 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinDoubleDoublePar return {true, tag, val}; } +template <bool merging> std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggStdDev(ArityType arity) { auto [_, fieldTag, fieldValue] = getFromStack(1); // Move the incoming accumulator state from the stack. Given that we are now the owner of the // state we are free to do any in-place update as we see fit. auto [accTag, accValue] = moveOwnedFromStack(0); - value::ValueGuard guard{accTag, accValue}; // Initialize the accumulator. if (accTag == value::TypeTags::Nothing) { - auto [newAccTag, newAccValue] = value::makeNewArray(); - value::ValueGuard newGuard{newAccTag, newAccValue}; - auto arr = value::getArrayView(newAccValue); + std::tie(accTag, accValue) = value::makeNewArray(); + value::ValueGuard newArrGuard{accTag, accValue}; + auto arr = value::getArrayView(accValue); arr->reserve(AggStdDevValueElems::kSizeOfArray); // The order of the following three elements should match to 'AggStdDevValueElems'. arr->push_back(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(0)); arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0)); arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0)); - aggStdDevImpl(arr, fieldTag, fieldValue); - newGuard.reset(); - return {true, newAccTag, newAccValue}; + newArrGuard.reset(); } + + value::ValueGuard guard{accTag, accValue}; tassert(5755210, "The result slot must be Array-typed", accTag == value::TypeTags::Array); + auto accumulator = value::getArrayView(accValue); + + if constexpr (merging) { + aggMergeStdDevsImpl(accumulator, fieldTag, fieldValue); + } else { + aggStdDevImpl(accumulator, fieldTag, fieldValue); + } - aggStdDevImpl(value::getArrayView(accValue), fieldTag, fieldValue); guard.reset(); return {true, accTag, accValue}; } @@ -4598,7 +4609,7 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti case Builtin::doubleDoubleSum: return builtinDoubleDoubleSum(arity); case Builtin::aggDoubleDoubleSum: - return builtinAggDoubleDoubleSum(arity); + return builtinAggDoubleDoubleSum<false /*merging*/>(arity); case Builtin::doubleDoubleSumFinalize: return builtinDoubleDoubleSumFinalize<>(arity); case Builtin::doubleDoubleMergeSumFinalize: @@ -4607,8 +4618,12 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti return builtinDoubleDoubleSumFinalize<true /*keepIntegerPrecision*/>(arity); case Builtin::doubleDoublePartialSumFinalize: return builtinDoubleDoublePartialSumFinalize(arity); + case Builtin::aggMergeDoubleDoubleSums: + return builtinAggDoubleDoubleSum<true /*merging*/>(arity); case Builtin::aggStdDev: - return builtinAggStdDev(arity); + return builtinAggStdDev<false /*merging*/>(arity); + case Builtin::aggMergeStdDevs: + return builtinAggStdDev<true /*merging*/>(arity); case Builtin::stdDevPopFinalize: return builtinStdDevPopFinalize(arity); case Builtin::stdDevSampFinalize: diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h index fd45b6beaae..42c1d27015b 100644 --- a/src/mongo/db/exec/sbe/vm/vm.h +++ b/src/mongo/db/exec/sbe/vm/vm.h @@ -511,12 +511,33 @@ enum class Builtin : uint8_t { collAddToSet, // agg function to append to a set (with collation) collAddToSetCapped, // agg function to append to a set (with collation), fails when the set // reaches specified size - doubleDoubleSum, // special double summation + + // Special double summation. + doubleDoubleSum, + // A variant of the standard sum aggregate function which maintains a DoubleDouble as the + // accumulator's underlying state. aggDoubleDoubleSum, + // Converts a DoubleDouble sum into a single numeric scalar for use once the summation is + // complete. doubleDoubleSumFinalize, + // A form of doubleDoubleSum finalization only necessary for sharding support when the cluster + // is not yet fully upgraded to FCV 6.0. doubleDoubleMergeSumFinalize, + // Converts a partial sum into a format suitable for serialization over the wire to the merging + // node. The merging node expects the internal state of the DoubleDouble summation to be + // serialized in a particular format. doubleDoublePartialSumFinalize, + // An agg function which can be used to sum a sequence of DoubleDouble inputs, producing the + // resulting total as a DoubleDouble. + aggMergeDoubleDoubleSums, + + // Implements Welford's online algorithm for computing sample or population standard deviation + // in a single pass. aggStdDev, + // Combines standard deviations that have been partially computed on a subset of the data + // using Welford's online algorithm. + aggMergeStdDevs, + stdDevPopFinalize, stdDevSampFinalize, bitTestZero, // test bitwise mask & value is zero @@ -989,11 +1010,19 @@ private: value::TypeTags fieldTag, value::Value fieldValue); - void aggDoubleDoubleSumImpl(value::Array* arr, value::TypeTags rhsTag, value::Value rhsValue); + void aggDoubleDoubleSumImpl(value::Array* accumulator, + value::TypeTags rhsTag, + value::Value rhsValue); + void aggMergeDoubleDoubleSumsImpl(value::Array* accumulator, + value::TypeTags rhsTag, + value::Value rhsValue); // This is an implementation of the following algorithm: // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm - void aggStdDevImpl(value::Array* arr, value::TypeTags rhsTag, value::Value rhsValue); + void aggStdDevImpl(value::Array* accumulator, value::TypeTags rhsTag, value::Value rhsValue); + void aggMergeStdDevsImpl(value::Array* accumulator, + value::TypeTags rhsTag, + value::Value rhsValue); std::tuple<bool, value::TypeTags, value::Value> aggStdDevFinalizeImpl(value::Value fieldValue, bool isSamp); @@ -1112,14 +1141,24 @@ private: CollatorInterface* collator); std::tuple<bool, value::TypeTags, value::Value> builtinAddToSetCapped(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinCollAddToSetCapped(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSum(ArityType arity); + // The template parameter is false for a regular DoubleDouble summation and true if merging + // partially computed DoubleDouble sums. + template <bool merging> std::tuple<bool, value::TypeTags, value::Value> builtinAggDoubleDoubleSum(ArityType arity); + // This is only for compatibility with mongos/sharding and we will revisit this later. template <bool keepIntegerPrecision = false> std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSumFinalize(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoublePartialSumFinalize( ArityType arity); + + // The template parameter is false for a regular std dev and true if merging partially computed + // standard devations. + template <bool merging> std::tuple<bool, value::TypeTags, value::Value> builtinAggStdDev(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> builtinStdDevPopFinalize(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinStdDevSampFinalize(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinBitTestZero(ArityType arity); diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp index 56f39a3758d..0cf745c15a1 100644 --- a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp +++ b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp @@ -205,6 +205,21 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAvg( return aggs; } +std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsAvg( + const AccumulationExpression& expr, + const sbe::value::SlotVector& inputSlots, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + tassert(7039539, + "partial agg combiner for $avg should have exactly two input slots", + inputSlots.size() == 2); + + std::vector<std::unique_ptr<sbe::EExpression>> aggs; + aggs.push_back(makeFunction("aggMergeDoubleDoubleSums", makeVariable(inputSlots[0]))); + aggs.push_back(makeFunction("sum", makeVariable(inputSlots[1]))); + return aggs; +} + std::unique_ptr<sbe::EExpression> buildFinalizeAvg(StageBuilderState& state, const AccumulationExpression& expr, const sbe::value::SlotVector& aggSlots) { @@ -289,6 +304,20 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorSum( return aggs; } +std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsSum( + const AccumulationExpression& expr, + const sbe::value::SlotVector& inputSlots, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + tassert(7039530, + "partial agg combiner for $sum should have exactly one input slot", + inputSlots.size() == 1); + auto arg = makeVariable(inputSlots[0]); + std::vector<std::unique_ptr<sbe::EExpression>> aggs; + aggs.push_back(makeFunction("aggMergeDoubleDoubleSums", std::move(arg))); + return aggs; +} + std::unique_ptr<sbe::EExpression> buildFinalizeSum(StageBuilderState& state, const AccumulationExpression& expr, const sbe::value::SlotVector& sumSlots) { @@ -470,6 +499,20 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorStdDev( return aggs; } +std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsStdDev( + const AccumulationExpression& expr, + const sbe::value::SlotVector& inputSlots, + boost::optional<sbe::value::SlotId> collatorSlot, + sbe::value::FrameIdGenerator& frameIdGenerator) { + tassert(7039540, + "partial agg combiner for stddev should have exactly one input slot", + inputSlots.size() == 1); + auto arg = makeVariable(inputSlots[0]); + std::vector<std::unique_ptr<sbe::EExpression>> aggs; + aggs.push_back(makeFunction("aggMergeStdDevs", std::move(arg))); + return aggs; +} + std::unique_ptr<sbe::EExpression> buildFinalizePartialStdDev(sbe::value::SlotId stdDevSlot) { // To support the sharding behavior, the mongos splits $group into two separate $group // stages one at the mongos-side and the other at the shard-side. This stage builder builds @@ -631,12 +674,16 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggregates( static const StringDataMap<BuildAggCombinerFn> kAggCombinerBuilders = { {AccumulatorAddToSet::kName, &buildCombinePartialAggsAddToSet}, + {AccumulatorAvg::kName, &buildCombinePartialAggsAvg}, {AccumulatorFirst::kName, &buildCombinePartialAggsFirst}, {AccumulatorLast::kName, &buildCombinePartialAggsLast}, {AccumulatorMax::kName, &buildCombinePartialAggsMax}, {AccumulatorMergeObjects::kName, &buildCombinePartialAggsMergeObjects}, {AccumulatorMin::kName, &buildCombinePartialAggsMin}, {AccumulatorPush::kName, &buildCombinePartialAggsPush}, + {AccumulatorStdDevPop::kName, &buildCombinePartialAggsStdDev}, + {AccumulatorStdDevSamp::kName, &buildCombinePartialAggsStdDev}, + {AccumulatorSum::kName, &buildCombinePartialAggsSum}, }; auto accExprName = acc.expr.name; diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp index 23d90b7ca31..732b04f62a8 100644 --- a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp +++ b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp @@ -1698,13 +1698,34 @@ public: _collatorSlotId{bindAccessor(&_collatorAccessor)} {} AccumulationStatement makeAccumulationStatement(StringData accumName) { - _accumulationStmtBson = BSON("unused" << BSON(accumName << "unused")); + return makeAccumulationStatement(BSON("unused" << BSON(accumName << "unused"))); + } + + AccumulationStatement makeAccumulationStatement(BSONObj accumulationStmt) { + _accumulationStmtBson = std::move(accumulationStmt); VariablesParseState vps = _expCtx->variablesParseState; return AccumulationStatement::parseAccumulationStatement( _expCtx.get(), _accumulationStmtBson.firstElement(), vps); } /** + * Convenience method for producing bytecode which combines partial aggregates for the given + * 'AccumulationStatement'. + * + * Requires that accumulation statement results in a single aggregate with one input and one + * output. Furthermore, cannot be used when the test case involves a non-simple collation. + */ + std::unique_ptr<sbe::vm::CodeFragment> compileSingleInputNoCollator( + const AccumulationStatement& accStatement) { + auto exprs = stage_builder::buildCombinePartialAggregates( + accStatement, {_inputSlotId}, boost::none, _frameIdGenerator); + ASSERT_EQ(exprs.size(), 1u); + _expr = std::move(exprs[0]); + + return compileAggExpression(*_expr, &_aggAccessor); + } + + /** * Verifies that executing the bytecode ('code') for combining partial aggregates for $group * spilling produces the 'expected' outputs given 'inputs'. * @@ -1751,6 +1772,7 @@ public: // Aggregate the inputs one-by-one, and at each step validate that the resulting accumulator // state is as expected. + int index = 0; while (!inputEnumerator.atEnd()) { ASSERT_FALSE(expectedEnumerator.atEnd()); auto [nextInputTag, nextInputVal] = inputEnumerator.getViewOfValue(); @@ -1777,8 +1799,7 @@ public: } auto [compareTag, compareValue] = sbe::value::compareValue( outputTag, outputVal, expectedOutputTag, expectedOutputValue); - ASSERT_EQ(compareTag, sbe::value::TypeTags::NumberInt32); - if (compareValue != 0) { + if (compareTag != sbe::value::TypeTags::NumberInt32 || compareValue != 0) { // The test failed, but dump the actual and expected values to the logs for ease of // debugging. str::stream actualBuilder; @@ -1792,7 +1813,8 @@ public: LOGV2(7039529, "Actual value not equal to expected value", "actual"_attr = actualBuilder, - "expected"_attr = expectedBuilder); + "expected"_attr = expectedBuilder, + "index"_attr = index); FAIL("accumulator did not have expected value"); } @@ -1800,6 +1822,7 @@ public: inputEnumerator.advance(); expectedEnumerator.advance(); + ++index; } } @@ -1851,20 +1874,70 @@ public: } /** - * Convenience method for producing bytecode which combines partial aggregates for the given - * 'AccumulationStatement'. + * Given the name of an SBE agg function ('aggFuncName') and an array of values expressed as a + * BSON array, aggregates the values inside the array and returns the resulting SBE value. + */ + std::pair<sbe::value::TypeTags, sbe::value::Value> makeOnePartialAggregate( + StringData aggFuncName, BSONArray valuesToAgg) { + // Make sure we are starting from a clean state. + _inputAccessor.reset(); + _aggAccessor.reset(); + + // Construct an expression which calls the given agg function, aggregating the values in + // '_inputSlotId'. + auto expr = + stage_builder::makeFunction(aggFuncName, stage_builder::makeVariable(_inputSlotId)); + auto code = compileAggExpression(*expr, &_aggAccessor); + + // Find the first element by skipping the length. + const char* bsonElt = valuesToAgg.objdata() + 4; + const char* bsonEnd = bsonElt + valuesToAgg.objsize(); + while (*bsonElt != 0) { + auto fieldName = sbe::bson::fieldNameView(bsonElt); + + // Convert the BSON value to an SBE value and put it inside the input slot. + auto [tag, val] = sbe::bson::convertFrom<false>(bsonElt, bsonEnd, fieldName.size()); + _inputAccessor.reset(true, tag, val); + + // Run the agg function, and put the result in the slot holding the aggregate value. + auto [outputTag, outputVal] = runCompiledExpression(code.get()); + _aggAccessor.reset(true, outputTag, outputVal); + + bsonElt = sbe::bson::advance(bsonElt, fieldName.size()); + } + + return _aggAccessor.copyOrMoveValue(); + } + + /** + * Returns an SBE array which contains a sequence of partial aggregate values. Useful for + * constructing a sequence of partial aggregates when those partial aggregates are not trivial + * to describe using BSON. The input to this function is a BSON array of BSON arrays; each of + * the inner arrays is aggregated using the given 'aggFuncName' in order to produce the output + * SBE array. * - * Requires that accumulation statement results in a single aggregate with one input and one - * output. Furthermore, cannot be used when the test case involves a non-simple collation. + * As an example, suppose the agg function is a simple sum. Given the input + * + * [[8, 1, 5], [6], [2,3]] + * + * the output will be the SBE array [14, 6, 5]. */ - std::unique_ptr<sbe::vm::CodeFragment> compileSingleInputNoCollator( - const AccumulationStatement& accStatement) { - auto exprs = stage_builder::buildCombinePartialAggregates( - accStatement, {_inputSlotId}, boost::none, _frameIdGenerator); - ASSERT_EQ(exprs.size(), 1u); - _expr = std::move(exprs[0]); + std::pair<sbe::value::TypeTags, sbe::value::Value> makePartialAggArray( + StringData aggFuncName, BSONArray arrayOfArrays) { + auto [arrTag, arrVal] = sbe::value::makeNewArray(); + sbe::value::ValueGuard guard{arrTag, arrVal}; + + auto arr = sbe::value::getArrayView(arrVal); + + for (auto&& element : arrayOfArrays) { + ASSERT(element.type() == BSONType::Array); + auto [tag, val] = + makeOnePartialAggregate(aggFuncName, BSONArray{element.embeddedObject()}); + arr->push_back(tag, val); + } - return compileAggExpression(*_expr, &_aggAccessor); + guard.reset(); + return {arrTag, arrVal}; } protected: @@ -2134,4 +2207,201 @@ TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsMergeObjects) { aggregateAndAssertResults(inputValues, expectedAggStates, compiledExpr.get()); } +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsSimpleCount) { + // $sum:1 is a simple count of the incoming documents. SERVER-65465 changed this scenario to use + // a simple summation rather than the DoubleDouble summation algorithm in more recent branches, + // but the 6.0 branch still uses DoubleDouble sum. + auto inputValues = BSON_ARRAY(5 << 8 << "MISSING" << 4); + auto [inputTag, inputVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, BSON_ARRAY(BSON_ARRAY(5) << BSON_ARRAY(8) << BSON_ARRAY(4))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(5) << BSON_ARRAY(5 << 8) << BSON_ARRAY(5 << 8 << 4))); + + auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum" << 1))); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSum) { + auto [inputTag, inputVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(1 << 2 << 3) << BSON_ARRAY(4 << 6) << BSON_ARRAY(1 << 1 << 1))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, BSON_ARRAY(BSON_ARRAY(6) << BSON_ARRAY(16) << BSON_ARRAY(19))); + + // A field path expression is needed so that the merging expression is constructed to combine + // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is + // irrelevant because the values are fed into the merging expression by the test fixture. + auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum" + << "$foo"))); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSumInfAndNan) { + auto [inputTag, inputVal] = + makePartialAggArray("aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(1 << 2 << 3) + << BSON_ARRAY(4 << std::numeric_limits<double>::infinity()) + << BSON_ARRAY(1 << 1 << 1) + << BSON_ARRAY(std::numeric_limits<double>::quiet_NaN()))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(6) << BSON_ARRAY(10 << std::numeric_limits<double>::infinity()) + << BSON_ARRAY(10 << std::numeric_limits<double>::infinity()) + << BSON_ARRAY(std::numeric_limits<double>::quiet_NaN()))); + + // A field path expression is needed so that the merging expression is constructed to combine + // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is + // irrelevant because the values are fed into the merging expression by the test fixture. + auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum" + << "$foo"))); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSumMixedTypes) { + auto [inputTag, inputVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(3ll << 4ll) << BSON_ARRAY(5.5 << 6.6) + << BSON_ARRAY(Decimal128(7) << Decimal128(8)))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(1 << 2 << 3ll << 4ll) + << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6) + << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6 + << Decimal128(7) << Decimal128(8)))); + + // A field path expression is needed so that the merging expression is constructed to combine + // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is + // irrelevant because the values are fed into the merging expression by the test fixture. + auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum" + << "$foo"))); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSumLargeInts) { + // Large 64-bit ints can't be represented precisely as doubles. This test demonstrates that when + // summing such large longs, the sum is returned as a long and no precision is lost. + const int64_t largeLong = std::numeric_limits<int64_t>::max() - 10; + + auto [inputTag, inputVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(largeLong << 1 << 1) << BSON_ARRAY(1ll << 1ll << 1ll))); + auto [expectedTag, expectedVal] = + makePartialAggArray("aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(largeLong + 2ll) << BSON_ARRAY(largeLong + 5ll))); + + // A field path expression is needed so that the merging expression is constructed to combine + // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is + // irrelevant because the values are fed into the merging expression by the test fixture. + auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum" + << "$foo"))); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); + + // Feed the result back into the input accessor. We finalize the resulting aggregate in order + // to make sure that the resulting sum is mathematically correct. + auto [resTag, resVal] = _aggAccessor.copyOrMoveValue(); + _inputAccessor.reset(true, resTag, resVal); + auto finalizeExpr = stage_builder::makeFunction("doubleDoubleSumFinalize", + stage_builder::makeVariable(_inputSlotId)); + auto finalizeCode = compileExpression(*finalizeExpr); + auto [finalizedTag, finalizedRes] = runCompiledExpression(finalizeCode.get()); + ASSERT_EQ(finalizedTag, sbe::value::TypeTags::NumberInt64); + ASSERT_EQ(sbe::value::bitcastTo<int64_t>(finalizedRes), largeLong + 5ll); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsAvg) { + auto accStatement = makeAccumulationStatement("$avg"_sd); + + // We expect $avg to result in two separate agg expressions: one for computing the sum and the + // other for computing the count. Both agg expressions read from the same input slot. + auto exprs = stage_builder::buildCombinePartialAggregates( + accStatement, {_inputSlotId, _inputSlotId}, boost::none, _frameIdGenerator); + ASSERT_EQ(exprs.size(), 2u); + + // Compile the first expression and make sure it can combine DoubleDouble summations as + // expected. + auto [inputTag, inputVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(3ll << 4ll) << BSON_ARRAY(5.5 << 6.6) + << BSON_ARRAY(Decimal128(7) << Decimal128(8)))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggDoubleDoubleSum"_sd, + BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(1 << 2 << 3ll << 4ll) + << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6) + << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6 + << Decimal128(7) << Decimal128(8)))); + auto doubleDoubleSumExpr = compileAggExpression(*exprs[0], &_aggAccessor); + aggregateAndAssertResults( + inputTag, inputVal, expectedTag, expectedVal, doubleDoubleSumExpr.get()); + + // Now compile the second expression and make sure it computes a simple sum. + auto simpleSumExpr = compileAggExpression(*exprs[1], &_aggAccessor); + + auto inputValues = BSON_ARRAY(5 << 8 << 0 << 4); + auto expectedAggStates = BSON_ARRAY(5 << 13 << 13 << 17); + aggregateAndAssertResults(inputValues, expectedAggStates, simpleSumExpr.get()); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsStdDevPop) { + auto [inputTag, inputVal] = makePartialAggArray( + "aggStdDev"_sd, + BSON_ARRAY(BSON_ARRAY(5 << 10) + << BSON_ARRAY(6 << 8) << BSON_ARRAY("MISSING") << BSON_ARRAY(1 << 9 << 10))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggStdDev"_sd, + BSON_ARRAY(BSON_ARRAY(5 << 10) + << BSON_ARRAY(5 << 10 << 6 << 8) << BSON_ARRAY(5 << 10 << 6 << 8) + << BSON_ARRAY(5 << 10 << 6 << 8 << 1 << 9 << 10))); + + auto accStatement = makeAccumulationStatement("$stdDevPop"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); + + // Feed the result back into the input accessor. + auto [resTag, resVal] = _aggAccessor.copyOrMoveValue(); + _inputAccessor.reset(true, resTag, resVal); + auto finalizeExpr = + stage_builder::makeFunction("stdDevPopFinalize", stage_builder::makeVariable(_inputSlotId)); + auto finalizeCode = compileExpression(*finalizeExpr); + auto [finalizedTag, finalizedRes] = runCompiledExpression(finalizeCode.get()); + ASSERT_EQ(finalizedTag, sbe::value::TypeTags::NumberDouble); + ASSERT_APPROX_EQUAL(sbe::value::bitcastTo<double>(finalizedRes), 3.0237, 0.0001); +} + +TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsStdDevSamp) { + auto [inputTag, inputVal] = makePartialAggArray( + "aggStdDev"_sd, + BSON_ARRAY(BSON_ARRAY(5 << 10) + << BSON_ARRAY(6 << 8) << BSON_ARRAY("MISSING") << BSON_ARRAY(1 << 9 << 10))); + auto [expectedTag, expectedVal] = makePartialAggArray( + "aggStdDev"_sd, + BSON_ARRAY(BSON_ARRAY(5 << 10) + << BSON_ARRAY(5 << 10 << 6 << 8) << BSON_ARRAY(5 << 10 << 6 << 8) + << BSON_ARRAY(5 << 10 << 6 << 8 << 1 << 9 << 10))); + + auto accStatement = makeAccumulationStatement("$stdDevSamp"_sd); + auto compiledExpr = compileSingleInputNoCollator(accStatement); + aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get()); + + // Feed the result back into the input accessor. + auto [resTag, resVal] = _aggAccessor.copyOrMoveValue(); + _inputAccessor.reset(true, resTag, resVal); + auto finalizeExpr = stage_builder::makeFunction("stdDevSampFinalize", + stage_builder::makeVariable(_inputSlotId)); + auto finalizeCode = compileExpression(*finalizeExpr); + auto [finalizedTag, finalizedRes] = runCompiledExpression(finalizeCode.get()); + ASSERT_EQ(finalizedTag, sbe::value::TypeTags::NumberDouble); + ASSERT_APPROX_EQUAL(sbe::value::bitcastTo<double>(finalizedRes), 3.2660, 0.0001); +} + } // namespace mongo |