summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2023-01-10 13:45:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-15 23:44:05 +0000
commitcb24503913347f080d85480683e91ad7282a1697 (patch)
treecf046ddebdc1628f1a97c27addff2db307162d17 /src/mongo/db
parent30ddfe887b07bbde3ded94d0d909fc6c2f716001 (diff)
downloadmongo-cb24503913347f080d85480683e91ad7282a1697.tar.gz
SERVER-70395 Combining partial aggs support for $sum, $avg, and std dev
(cherry picked from commit efb394a3e7b3133148181e50f86edddedbc588d9)
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.cpp4
-rw-r--r--src/mongo/db/exec/sbe/vm/arith.cpp112
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.cpp55
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.h45
-rw-r--r--src/mongo/db/query/sbe_stage_builder_accumulator.cpp47
-rw-r--r--src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp300
6 files changed, 525 insertions, 38 deletions
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp
index a145598592c..96d5d175fa9 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.cpp
+++ b/src/mongo/db/exec/sbe/expressions/expression.cpp
@@ -429,6 +429,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleSum, false}},
{"aggDoubleDoubleSum",
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggDoubleDoubleSum, true}},
+ {"aggMergeDoubleDoubleSums",
+ BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggMergeDoubleDoubleSums, true}},
{"doubleDoubleSumFinalize",
BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoubleSumFinalize, false}},
{"doubleDoubleMergeSumFinalize",
@@ -436,6 +438,8 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
{"doubleDoublePartialSumFinalize",
BuiltinFn{[](size_t n) { return n > 0; }, vm::Builtin::doubleDoublePartialSumFinalize, false}},
{"aggStdDev", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggStdDev, true}},
+ {"aggMergeStdDevs",
+ BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggMergeStdDevs, true}},
{"stdDevPopFinalize",
BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::stdDevPopFinalize, false}},
{"stdDevSampFinalize",
diff --git a/src/mongo/db/exec/sbe/vm/arith.cpp b/src/mongo/db/exec/sbe/vm/arith.cpp
index e4c67775ad8..1d41ff59ce1 100644
--- a/src/mongo/db/exec/sbe/vm/arith.cpp
+++ b/src/mongo/db/exec/sbe/vm/arith.cpp
@@ -503,6 +503,57 @@ void ByteCode::aggDoubleDoubleSumImpl(value::Array* arr,
}
}
+void ByteCode::aggMergeDoubleDoubleSumsImpl(value::Array* accumulator,
+ value::TypeTags rhsTag,
+ value::Value rhsValue) {
+ auto [accumWidestType, _1] = accumulator->getAt(AggSumValueElems::kNonDecimalTotalTag);
+
+ tassert(7039532, "value must be of type 'Array'", rhsTag == value::TypeTags::Array);
+ auto nextDoubleDoubleArr = value::getArrayView(rhsValue);
+
+ tassert(7039533,
+ "array does not have enough elements",
+ nextDoubleDoubleArr->size() >= AggSumValueElems::kMaxSizeOfArray - 1);
+
+ // First aggregate the non-decimal sum, then the non-decimal addend. Both should be doubles.
+ auto [sumTag, sum] = nextDoubleDoubleArr->getAt(AggSumValueElems::kNonDecimalTotalSum);
+ tassert(7039534, "expected 'NumberDouble'", sumTag == value::TypeTags::NumberDouble);
+ aggDoubleDoubleSumImpl(accumulator, sumTag, sum);
+
+ auto [addendTag, addend] = nextDoubleDoubleArr->getAt(AggSumValueElems::kNonDecimalTotalAddend);
+ tassert(7039535, "expected 'NumberDouble'", addendTag == value::TypeTags::NumberDouble);
+ // There is a special case when the 'sum' is infinite and the 'addend' is NaN. This DoubleDouble
+ // value represents infinity, not NaN. Therefore, we avoid incorporating the NaN 'addend' value
+ // into the sum.
+ if (std::isfinite(value::bitcastTo<double>(sum)) ||
+ !std::isnan(value::bitcastTo<double>(addend))) {
+ aggDoubleDoubleSumImpl(accumulator, addendTag, addend);
+ }
+
+ // Determine the widest non-decimal type that we've seen so far, and set the accumulator state
+ // accordingly. We do this after computing the sums, since 'aggDoubleDoubleSumImpl()' will
+ // set the widest type to 'NumberDouble' when we call it above.
+ auto [newValWidestType, _2] = nextDoubleDoubleArr->getAt(AggSumValueElems::kNonDecimalTotalTag);
+ tassert(
+ 7039536, "unexpected 'NumberDecimal'", newValWidestType != value::TypeTags::NumberDecimal);
+ tassert(
+ 7039537, "unexpected 'NumberDecimal'", accumWidestType != value::TypeTags::NumberDecimal);
+ auto widestType = getWidestNumericalType(newValWidestType, accumWidestType);
+ accumulator->setAt(
+ AggSumValueElems::kNonDecimalTotalTag, widestType, value::bitcastFrom<int32_t>(0));
+
+ // If there's a decimal128 sum as part of the incoming DoubleDouble sum, incorporate it into the
+ // accumulator.
+ if (nextDoubleDoubleArr->size() == AggSumValueElems::kMaxSizeOfArray) {
+ auto [decimalTotalTag, decimalTotalVal] =
+ nextDoubleDoubleArr->getAt(AggSumValueElems::kDecimalTotal);
+ tassert(7039538,
+ "The decimalTotal must be 'NumberDecimal'",
+ decimalTotalTag == TypeTags::NumberDecimal);
+ aggDoubleDoubleSumImpl(accumulator, decimalTotalTag, decimalTotalVal);
+ }
+}
+
void ByteCode::aggStdDevImpl(value::Array* arr, value::TypeTags rhsTag, value::Value rhsValue) {
if (!isNumber(rhsTag)) {
return;
@@ -551,6 +602,67 @@ void ByteCode::aggStdDevImpl(value::Array* arr, value::TypeTags rhsTag, value::V
return setStdDevArray(newCountVal, newMeanVal, newM2Val, arr);
}
+void ByteCode::aggMergeStdDevsImpl(value::Array* accumulator,
+ value::TypeTags rhsTag,
+ value::Value rhsValue) {
+ tassert(7039542, "expected value of type 'Array'", rhsTag == value::TypeTags::Array);
+ auto nextArr = value::getArrayView(rhsValue);
+
+ tassert(7039543,
+ "expected array to have exactly 3 elements",
+ accumulator->size() == AggStdDevValueElems::kSizeOfArray);
+ tassert(7039544,
+ "expected array to have exactly 3 elements",
+ nextArr->size() == AggStdDevValueElems::kSizeOfArray);
+
+ auto [newCountTag, newCountVal] = nextArr->getAt(AggStdDevValueElems::kCount);
+ tassert(7039545, "expected 64-bit int", newCountTag == value::TypeTags::NumberInt64);
+ int64_t newCount = value::bitcastTo<int64_t>(newCountVal);
+
+ // If the incoming partial aggregate has a count of zero, then it represents the partial
+ // standard deviation of no data points. This means that it can be safely ignored, and we return
+ // the accumulator as is.
+ if (newCount == 0) {
+ return;
+ }
+
+ auto [oldCountTag, oldCountVal] = accumulator->getAt(AggStdDevValueElems::kCount);
+ tassert(7039546, "expected 64-bit int", oldCountTag == value::TypeTags::NumberInt64);
+ int64_t oldCount = value::bitcastTo<int64_t>(oldCountVal);
+
+ auto [oldMeanTag, oldMeanVal] = accumulator->getAt(AggStdDevValueElems::kRunningMean);
+ tassert(7039547, "expected double", oldMeanTag == value::TypeTags::NumberDouble);
+ double oldMean = value::bitcastTo<double>(oldMeanVal);
+
+ auto [newMeanTag, newMeanVal] = nextArr->getAt(AggStdDevValueElems::kRunningMean);
+ tassert(7039548, "expected double", newMeanTag == value::TypeTags::NumberDouble);
+ double newMean = value::bitcastTo<double>(newMeanVal);
+
+ auto [oldM2Tag, oldM2Val] = accumulator->getAt(AggStdDevValueElems::kRunningM2);
+ tassert(7039531, "expected double", oldM2Tag == value::TypeTags::NumberDouble);
+ double oldM2 = value::bitcastTo<double>(oldM2Val);
+
+ auto [newM2Tag, newM2Val] = nextArr->getAt(AggStdDevValueElems::kRunningM2);
+ tassert(7039541, "expected double", newM2Tag == value::TypeTags::NumberDouble);
+ double newM2 = value::bitcastTo<double>(newM2Val);
+
+ const double delta = newMean - oldMean;
+ // We've already handled the case where 'newCount' is zero above. This means that 'totalCount'
+ // must be positive, and prevents us from ever dividing by zero in the subsequent calculation.
+ int64_t totalCount = oldCount + newCount;
+ if (delta != 0) {
+ newMean = ((oldCount * oldMean) + (newCount * newMean)) / totalCount;
+ newM2 += delta * delta *
+ (static_cast<double>(oldCount) * static_cast<double>(newCount) / totalCount);
+ }
+ newM2 += oldM2;
+
+ setStdDevArray(value::bitcastFrom<int64_t>(totalCount),
+ value::bitcastFrom<double>(newMean),
+ value::bitcastFrom<double>(newM2),
+ accumulator);
+}
+
std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggStdDevFinalizeImpl(
value::Value fieldValue, bool isSamp) {
auto arr = value::getArrayView(fieldValue);
diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp
index f8b6e394ab4..0b2d83f9f9f 100644
--- a/src/mongo/db/exec/sbe/vm/vm.cpp
+++ b/src/mongo/db/exec/sbe/vm/vm.cpp
@@ -1072,35 +1072,40 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggSum(value::TypeTags
return genericAdd(accTag, accValue, fieldTag, fieldValue);
}
+template <bool merging>
std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggDoubleDoubleSum(
ArityType arity) {
-
auto [_, fieldTag, fieldValue] = getFromStack(1);
// Move the incoming accumulator state from the stack. Given that we are now the owner of the
// state we are free to do any in-place update as we see fit.
auto [accTag, accValue] = moveOwnedFromStack(0);
- value::ValueGuard guard{accTag, accValue};
// Initialize the accumulator.
if (accTag == value::TypeTags::Nothing) {
std::tie(accTag, accValue) = value::makeNewArray();
- value::ValueGuard guard{accTag, accValue};
+ value::ValueGuard newArrGuard{accTag, accValue};
auto arr = value::getArrayView(accValue);
arr->reserve(AggSumValueElems::kMaxSizeOfArray);
- // The order of the following three elements should match to 'AggSumValueElems'.
+ // The order of the following three elements should match to 'AggSumValueElems'. An absent
+ // 'kDecimalTotal' element means that we've not seen any decimal value. So, we're not adding
+ // 'kDecimalTotal' element yet.
arr->push_back(value::TypeTags::NumberInt32, value::bitcastFrom<int32_t>(0));
arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0));
arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0));
- // The absent 'kDecimalTotal' element means that we've not seen any decimal value. So, we're
- // not adding 'kDecimalTotal' element yet.
- aggDoubleDoubleSumImpl(arr, fieldTag, fieldValue);
- guard.reset();
- return {true, accTag, accValue};
+ newArrGuard.reset();
}
+
+ value::ValueGuard guard{accTag, accValue};
tassert(5755317, "The result slot must be Array-typed", accTag == value::TypeTags::Array);
+ auto accumulator = value::getArrayView(accValue);
+
+ if constexpr (merging) {
+ aggMergeDoubleDoubleSumsImpl(accumulator, fieldTag, fieldValue);
+ } else {
+ aggDoubleDoubleSumImpl(accumulator, fieldTag, fieldValue);
+ }
- aggDoubleDoubleSumImpl(value::getArrayView(accValue), fieldTag, fieldValue);
guard.reset();
return {true, accTag, accValue};
}
@@ -1235,31 +1240,37 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinDoubleDoublePar
return {true, tag, val};
}
+template <bool merging>
std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggStdDev(ArityType arity) {
auto [_, fieldTag, fieldValue] = getFromStack(1);
// Move the incoming accumulator state from the stack. Given that we are now the owner of the
// state we are free to do any in-place update as we see fit.
auto [accTag, accValue] = moveOwnedFromStack(0);
- value::ValueGuard guard{accTag, accValue};
// Initialize the accumulator.
if (accTag == value::TypeTags::Nothing) {
- auto [newAccTag, newAccValue] = value::makeNewArray();
- value::ValueGuard newGuard{newAccTag, newAccValue};
- auto arr = value::getArrayView(newAccValue);
+ std::tie(accTag, accValue) = value::makeNewArray();
+ value::ValueGuard newArrGuard{accTag, accValue};
+ auto arr = value::getArrayView(accValue);
arr->reserve(AggStdDevValueElems::kSizeOfArray);
// The order of the following three elements should match to 'AggStdDevValueElems'.
arr->push_back(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(0));
arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0));
arr->push_back(value::TypeTags::NumberDouble, value::bitcastFrom<double>(0.0));
- aggStdDevImpl(arr, fieldTag, fieldValue);
- newGuard.reset();
- return {true, newAccTag, newAccValue};
+ newArrGuard.reset();
}
+
+ value::ValueGuard guard{accTag, accValue};
tassert(5755210, "The result slot must be Array-typed", accTag == value::TypeTags::Array);
+ auto accumulator = value::getArrayView(accValue);
+
+ if constexpr (merging) {
+ aggMergeStdDevsImpl(accumulator, fieldTag, fieldValue);
+ } else {
+ aggStdDevImpl(accumulator, fieldTag, fieldValue);
+ }
- aggStdDevImpl(value::getArrayView(accValue), fieldTag, fieldValue);
guard.reset();
return {true, accTag, accValue};
}
@@ -4598,7 +4609,7 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti
case Builtin::doubleDoubleSum:
return builtinDoubleDoubleSum(arity);
case Builtin::aggDoubleDoubleSum:
- return builtinAggDoubleDoubleSum(arity);
+ return builtinAggDoubleDoubleSum<false /*merging*/>(arity);
case Builtin::doubleDoubleSumFinalize:
return builtinDoubleDoubleSumFinalize<>(arity);
case Builtin::doubleDoubleMergeSumFinalize:
@@ -4607,8 +4618,12 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti
return builtinDoubleDoubleSumFinalize<true /*keepIntegerPrecision*/>(arity);
case Builtin::doubleDoublePartialSumFinalize:
return builtinDoubleDoublePartialSumFinalize(arity);
+ case Builtin::aggMergeDoubleDoubleSums:
+ return builtinAggDoubleDoubleSum<true /*merging*/>(arity);
case Builtin::aggStdDev:
- return builtinAggStdDev(arity);
+ return builtinAggStdDev<false /*merging*/>(arity);
+ case Builtin::aggMergeStdDevs:
+ return builtinAggStdDev<true /*merging*/>(arity);
case Builtin::stdDevPopFinalize:
return builtinStdDevPopFinalize(arity);
case Builtin::stdDevSampFinalize:
diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h
index fd45b6beaae..42c1d27015b 100644
--- a/src/mongo/db/exec/sbe/vm/vm.h
+++ b/src/mongo/db/exec/sbe/vm/vm.h
@@ -511,12 +511,33 @@ enum class Builtin : uint8_t {
collAddToSet, // agg function to append to a set (with collation)
collAddToSetCapped, // agg function to append to a set (with collation), fails when the set
// reaches specified size
- doubleDoubleSum, // special double summation
+
+ // Special double summation.
+ doubleDoubleSum,
+ // A variant of the standard sum aggregate function which maintains a DoubleDouble as the
+ // accumulator's underlying state.
aggDoubleDoubleSum,
+ // Converts a DoubleDouble sum into a single numeric scalar for use once the summation is
+ // complete.
doubleDoubleSumFinalize,
+ // A form of doubleDoubleSum finalization only necessary for sharding support when the cluster
+ // is not yet fully upgraded to FCV 6.0.
doubleDoubleMergeSumFinalize,
+ // Converts a partial sum into a format suitable for serialization over the wire to the merging
+ // node. The merging node expects the internal state of the DoubleDouble summation to be
+ // serialized in a particular format.
doubleDoublePartialSumFinalize,
+ // An agg function which can be used to sum a sequence of DoubleDouble inputs, producing the
+ // resulting total as a DoubleDouble.
+ aggMergeDoubleDoubleSums,
+
+ // Implements Welford's online algorithm for computing sample or population standard deviation
+ // in a single pass.
aggStdDev,
+ // Combines standard deviations that have been partially computed on a subset of the data
+ // using Welford's online algorithm.
+ aggMergeStdDevs,
+
stdDevPopFinalize,
stdDevSampFinalize,
bitTestZero, // test bitwise mask & value is zero
@@ -989,11 +1010,19 @@ private:
value::TypeTags fieldTag,
value::Value fieldValue);
- void aggDoubleDoubleSumImpl(value::Array* arr, value::TypeTags rhsTag, value::Value rhsValue);
+ void aggDoubleDoubleSumImpl(value::Array* accumulator,
+ value::TypeTags rhsTag,
+ value::Value rhsValue);
+ void aggMergeDoubleDoubleSumsImpl(value::Array* accumulator,
+ value::TypeTags rhsTag,
+ value::Value rhsValue);
// This is an implementation of the following algorithm:
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
- void aggStdDevImpl(value::Array* arr, value::TypeTags rhsTag, value::Value rhsValue);
+ void aggStdDevImpl(value::Array* accumulator, value::TypeTags rhsTag, value::Value rhsValue);
+ void aggMergeStdDevsImpl(value::Array* accumulator,
+ value::TypeTags rhsTag,
+ value::Value rhsValue);
std::tuple<bool, value::TypeTags, value::Value> aggStdDevFinalizeImpl(value::Value fieldValue,
bool isSamp);
@@ -1112,14 +1141,24 @@ private:
CollatorInterface* collator);
std::tuple<bool, value::TypeTags, value::Value> builtinAddToSetCapped(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinCollAddToSetCapped(ArityType arity);
+
std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSum(ArityType arity);
+ // The template parameter is false for a regular DoubleDouble summation and true if merging
+ // partially computed DoubleDouble sums.
+ template <bool merging>
std::tuple<bool, value::TypeTags, value::Value> builtinAggDoubleDoubleSum(ArityType arity);
+
// This is only for compatibility with mongos/sharding and we will revisit this later.
template <bool keepIntegerPrecision = false>
std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoubleSumFinalize(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinDoubleDoublePartialSumFinalize(
ArityType arity);
+
+ // The template parameter is false for a regular std dev and true if merging partially computed
+ // standard devations.
+ template <bool merging>
std::tuple<bool, value::TypeTags, value::Value> builtinAggStdDev(ArityType arity);
+
std::tuple<bool, value::TypeTags, value::Value> builtinStdDevPopFinalize(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinStdDevSampFinalize(ArityType arity);
std::tuple<bool, value::TypeTags, value::Value> builtinBitTestZero(ArityType arity);
diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
index 56f39a3758d..0cf745c15a1 100644
--- a/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_accumulator.cpp
@@ -205,6 +205,21 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorAvg(
return aggs;
}
+std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsAvg(
+ const AccumulationExpression& expr,
+ const sbe::value::SlotVector& inputSlots,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ tassert(7039539,
+ "partial agg combiner for $avg should have exactly two input slots",
+ inputSlots.size() == 2);
+
+ std::vector<std::unique_ptr<sbe::EExpression>> aggs;
+ aggs.push_back(makeFunction("aggMergeDoubleDoubleSums", makeVariable(inputSlots[0])));
+ aggs.push_back(makeFunction("sum", makeVariable(inputSlots[1])));
+ return aggs;
+}
+
std::unique_ptr<sbe::EExpression> buildFinalizeAvg(StageBuilderState& state,
const AccumulationExpression& expr,
const sbe::value::SlotVector& aggSlots) {
@@ -289,6 +304,20 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorSum(
return aggs;
}
+std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsSum(
+ const AccumulationExpression& expr,
+ const sbe::value::SlotVector& inputSlots,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ tassert(7039530,
+ "partial agg combiner for $sum should have exactly one input slot",
+ inputSlots.size() == 1);
+ auto arg = makeVariable(inputSlots[0]);
+ std::vector<std::unique_ptr<sbe::EExpression>> aggs;
+ aggs.push_back(makeFunction("aggMergeDoubleDoubleSums", std::move(arg)));
+ return aggs;
+}
+
std::unique_ptr<sbe::EExpression> buildFinalizeSum(StageBuilderState& state,
const AccumulationExpression& expr,
const sbe::value::SlotVector& sumSlots) {
@@ -470,6 +499,20 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildAccumulatorStdDev(
return aggs;
}
+std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggsStdDev(
+ const AccumulationExpression& expr,
+ const sbe::value::SlotVector& inputSlots,
+ boost::optional<sbe::value::SlotId> collatorSlot,
+ sbe::value::FrameIdGenerator& frameIdGenerator) {
+ tassert(7039540,
+ "partial agg combiner for stddev should have exactly one input slot",
+ inputSlots.size() == 1);
+ auto arg = makeVariable(inputSlots[0]);
+ std::vector<std::unique_ptr<sbe::EExpression>> aggs;
+ aggs.push_back(makeFunction("aggMergeStdDevs", std::move(arg)));
+ return aggs;
+}
+
std::unique_ptr<sbe::EExpression> buildFinalizePartialStdDev(sbe::value::SlotId stdDevSlot) {
// To support the sharding behavior, the mongos splits $group into two separate $group
// stages one at the mongos-side and the other at the shard-side. This stage builder builds
@@ -631,12 +674,16 @@ std::vector<std::unique_ptr<sbe::EExpression>> buildCombinePartialAggregates(
static const StringDataMap<BuildAggCombinerFn> kAggCombinerBuilders = {
{AccumulatorAddToSet::kName, &buildCombinePartialAggsAddToSet},
+ {AccumulatorAvg::kName, &buildCombinePartialAggsAvg},
{AccumulatorFirst::kName, &buildCombinePartialAggsFirst},
{AccumulatorLast::kName, &buildCombinePartialAggsLast},
{AccumulatorMax::kName, &buildCombinePartialAggsMax},
{AccumulatorMergeObjects::kName, &buildCombinePartialAggsMergeObjects},
{AccumulatorMin::kName, &buildCombinePartialAggsMin},
{AccumulatorPush::kName, &buildCombinePartialAggsPush},
+ {AccumulatorStdDevPop::kName, &buildCombinePartialAggsStdDev},
+ {AccumulatorStdDevSamp::kName, &buildCombinePartialAggsStdDev},
+ {AccumulatorSum::kName, &buildCombinePartialAggsSum},
};
auto accExprName = acc.expr.name;
diff --git a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp
index 23d90b7ca31..732b04f62a8 100644
--- a/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_accumulator_test.cpp
@@ -1698,13 +1698,34 @@ public:
_collatorSlotId{bindAccessor(&_collatorAccessor)} {}
AccumulationStatement makeAccumulationStatement(StringData accumName) {
- _accumulationStmtBson = BSON("unused" << BSON(accumName << "unused"));
+ return makeAccumulationStatement(BSON("unused" << BSON(accumName << "unused")));
+ }
+
+ AccumulationStatement makeAccumulationStatement(BSONObj accumulationStmt) {
+ _accumulationStmtBson = std::move(accumulationStmt);
VariablesParseState vps = _expCtx->variablesParseState;
return AccumulationStatement::parseAccumulationStatement(
_expCtx.get(), _accumulationStmtBson.firstElement(), vps);
}
/**
+ * Convenience method for producing bytecode which combines partial aggregates for the given
+ * 'AccumulationStatement'.
+ *
+ * Requires that accumulation statement results in a single aggregate with one input and one
+ * output. Furthermore, cannot be used when the test case involves a non-simple collation.
+ */
+ std::unique_ptr<sbe::vm::CodeFragment> compileSingleInputNoCollator(
+ const AccumulationStatement& accStatement) {
+ auto exprs = stage_builder::buildCombinePartialAggregates(
+ accStatement, {_inputSlotId}, boost::none, _frameIdGenerator);
+ ASSERT_EQ(exprs.size(), 1u);
+ _expr = std::move(exprs[0]);
+
+ return compileAggExpression(*_expr, &_aggAccessor);
+ }
+
+ /**
* Verifies that executing the bytecode ('code') for combining partial aggregates for $group
* spilling produces the 'expected' outputs given 'inputs'.
*
@@ -1751,6 +1772,7 @@ public:
// Aggregate the inputs one-by-one, and at each step validate that the resulting accumulator
// state is as expected.
+ int index = 0;
while (!inputEnumerator.atEnd()) {
ASSERT_FALSE(expectedEnumerator.atEnd());
auto [nextInputTag, nextInputVal] = inputEnumerator.getViewOfValue();
@@ -1777,8 +1799,7 @@ public:
}
auto [compareTag, compareValue] = sbe::value::compareValue(
outputTag, outputVal, expectedOutputTag, expectedOutputValue);
- ASSERT_EQ(compareTag, sbe::value::TypeTags::NumberInt32);
- if (compareValue != 0) {
+ if (compareTag != sbe::value::TypeTags::NumberInt32 || compareValue != 0) {
// The test failed, but dump the actual and expected values to the logs for ease of
// debugging.
str::stream actualBuilder;
@@ -1792,7 +1813,8 @@ public:
LOGV2(7039529,
"Actual value not equal to expected value",
"actual"_attr = actualBuilder,
- "expected"_attr = expectedBuilder);
+ "expected"_attr = expectedBuilder,
+ "index"_attr = index);
FAIL("accumulator did not have expected value");
}
@@ -1800,6 +1822,7 @@ public:
inputEnumerator.advance();
expectedEnumerator.advance();
+ ++index;
}
}
@@ -1851,20 +1874,70 @@ public:
}
/**
- * Convenience method for producing bytecode which combines partial aggregates for the given
- * 'AccumulationStatement'.
+ * Given the name of an SBE agg function ('aggFuncName') and an array of values expressed as a
+ * BSON array, aggregates the values inside the array and returns the resulting SBE value.
+ */
+ std::pair<sbe::value::TypeTags, sbe::value::Value> makeOnePartialAggregate(
+ StringData aggFuncName, BSONArray valuesToAgg) {
+ // Make sure we are starting from a clean state.
+ _inputAccessor.reset();
+ _aggAccessor.reset();
+
+ // Construct an expression which calls the given agg function, aggregating the values in
+ // '_inputSlotId'.
+ auto expr =
+ stage_builder::makeFunction(aggFuncName, stage_builder::makeVariable(_inputSlotId));
+ auto code = compileAggExpression(*expr, &_aggAccessor);
+
+ // Find the first element by skipping the length.
+ const char* bsonElt = valuesToAgg.objdata() + 4;
+ const char* bsonEnd = bsonElt + valuesToAgg.objsize();
+ while (*bsonElt != 0) {
+ auto fieldName = sbe::bson::fieldNameView(bsonElt);
+
+ // Convert the BSON value to an SBE value and put it inside the input slot.
+ auto [tag, val] = sbe::bson::convertFrom<false>(bsonElt, bsonEnd, fieldName.size());
+ _inputAccessor.reset(true, tag, val);
+
+ // Run the agg function, and put the result in the slot holding the aggregate value.
+ auto [outputTag, outputVal] = runCompiledExpression(code.get());
+ _aggAccessor.reset(true, outputTag, outputVal);
+
+ bsonElt = sbe::bson::advance(bsonElt, fieldName.size());
+ }
+
+ return _aggAccessor.copyOrMoveValue();
+ }
+
+ /**
+ * Returns an SBE array which contains a sequence of partial aggregate values. Useful for
+ * constructing a sequence of partial aggregates when those partial aggregates are not trivial
+ * to describe using BSON. The input to this function is a BSON array of BSON arrays; each of
+ * the inner arrays is aggregated using the given 'aggFuncName' in order to produce the output
+ * SBE array.
*
- * Requires that accumulation statement results in a single aggregate with one input and one
- * output. Furthermore, cannot be used when the test case involves a non-simple collation.
+ * As an example, suppose the agg function is a simple sum. Given the input
+ *
+ * [[8, 1, 5], [6], [2,3]]
+ *
+ * the output will be the SBE array [14, 6, 5].
*/
- std::unique_ptr<sbe::vm::CodeFragment> compileSingleInputNoCollator(
- const AccumulationStatement& accStatement) {
- auto exprs = stage_builder::buildCombinePartialAggregates(
- accStatement, {_inputSlotId}, boost::none, _frameIdGenerator);
- ASSERT_EQ(exprs.size(), 1u);
- _expr = std::move(exprs[0]);
+ std::pair<sbe::value::TypeTags, sbe::value::Value> makePartialAggArray(
+ StringData aggFuncName, BSONArray arrayOfArrays) {
+ auto [arrTag, arrVal] = sbe::value::makeNewArray();
+ sbe::value::ValueGuard guard{arrTag, arrVal};
+
+ auto arr = sbe::value::getArrayView(arrVal);
+
+ for (auto&& element : arrayOfArrays) {
+ ASSERT(element.type() == BSONType::Array);
+ auto [tag, val] =
+ makeOnePartialAggregate(aggFuncName, BSONArray{element.embeddedObject()});
+ arr->push_back(tag, val);
+ }
- return compileAggExpression(*_expr, &_aggAccessor);
+ guard.reset();
+ return {arrTag, arrVal};
}
protected:
@@ -2134,4 +2207,201 @@ TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsMergeObjects) {
aggregateAndAssertResults(inputValues, expectedAggStates, compiledExpr.get());
}
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsSimpleCount) {
+ // $sum:1 is a simple count of the incoming documents. SERVER-65465 changed this scenario to use
+ // a simple summation rather than the DoubleDouble summation algorithm in more recent branches,
+ // but the 6.0 branch still uses DoubleDouble sum.
+ auto inputValues = BSON_ARRAY(5 << 8 << "MISSING" << 4);
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd, BSON_ARRAY(BSON_ARRAY(5) << BSON_ARRAY(8) << BSON_ARRAY(4)));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(5) << BSON_ARRAY(5 << 8) << BSON_ARRAY(5 << 8 << 4)));
+
+ auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum" << 1)));
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSum) {
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(1 << 2 << 3) << BSON_ARRAY(4 << 6) << BSON_ARRAY(1 << 1 << 1)));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd, BSON_ARRAY(BSON_ARRAY(6) << BSON_ARRAY(16) << BSON_ARRAY(19)));
+
+ // A field path expression is needed so that the merging expression is constructed to combine
+ // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is
+ // irrelevant because the values are fed into the merging expression by the test fixture.
+ auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum"
+ << "$foo")));
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSumInfAndNan) {
+ auto [inputTag, inputVal] =
+ makePartialAggArray("aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(1 << 2 << 3)
+ << BSON_ARRAY(4 << std::numeric_limits<double>::infinity())
+ << BSON_ARRAY(1 << 1 << 1)
+ << BSON_ARRAY(std::numeric_limits<double>::quiet_NaN())));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(6) << BSON_ARRAY(10 << std::numeric_limits<double>::infinity())
+ << BSON_ARRAY(10 << std::numeric_limits<double>::infinity())
+ << BSON_ARRAY(std::numeric_limits<double>::quiet_NaN())));
+
+ // A field path expression is needed so that the merging expression is constructed to combine
+ // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is
+ // irrelevant because the values are fed into the merging expression by the test fixture.
+ auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum"
+ << "$foo")));
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSumMixedTypes) {
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(3ll << 4ll) << BSON_ARRAY(5.5 << 6.6)
+ << BSON_ARRAY(Decimal128(7) << Decimal128(8))));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(1 << 2 << 3ll << 4ll)
+ << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6)
+ << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6
+ << Decimal128(7) << Decimal128(8))));
+
+ // A field path expression is needed so that the merging expression is constructed to combine
+ // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is
+ // irrelevant because the values are fed into the merging expression by the test fixture.
+ auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum"
+ << "$foo")));
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsDoubleDoubleSumLargeInts) {
+ // Large 64-bit ints can't be represented precisely as doubles. This test demonstrates that when
+ // summing such large longs, the sum is returned as a long and no precision is lost.
+ const int64_t largeLong = std::numeric_limits<int64_t>::max() - 10;
+
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(largeLong << 1 << 1) << BSON_ARRAY(1ll << 1ll << 1ll)));
+ auto [expectedTag, expectedVal] =
+ makePartialAggArray("aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(largeLong + 2ll) << BSON_ARRAY(largeLong + 5ll)));
+
+ // A field path expression is needed so that the merging expression is constructed to combine
+ // DoubleDouble summations rather than doing a simple sum. The actual field name "foo" is
+ // irrelevant because the values are fed into the merging expression by the test fixture.
+ auto accStatement = makeAccumulationStatement(BSON("unused" << BSON("$sum"
+ << "$foo")));
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+
+ // Feed the result back into the input accessor. We finalize the resulting aggregate in order
+ // to make sure that the resulting sum is mathematically correct.
+ auto [resTag, resVal] = _aggAccessor.copyOrMoveValue();
+ _inputAccessor.reset(true, resTag, resVal);
+ auto finalizeExpr = stage_builder::makeFunction("doubleDoubleSumFinalize",
+ stage_builder::makeVariable(_inputSlotId));
+ auto finalizeCode = compileExpression(*finalizeExpr);
+ auto [finalizedTag, finalizedRes] = runCompiledExpression(finalizeCode.get());
+ ASSERT_EQ(finalizedTag, sbe::value::TypeTags::NumberInt64);
+ ASSERT_EQ(sbe::value::bitcastTo<int64_t>(finalizedRes), largeLong + 5ll);
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsAvg) {
+ auto accStatement = makeAccumulationStatement("$avg"_sd);
+
+ // We expect $avg to result in two separate agg expressions: one for computing the sum and the
+ // other for computing the count. Both agg expressions read from the same input slot.
+ auto exprs = stage_builder::buildCombinePartialAggregates(
+ accStatement, {_inputSlotId, _inputSlotId}, boost::none, _frameIdGenerator);
+ ASSERT_EQ(exprs.size(), 2u);
+
+ // Compile the first expression and make sure it can combine DoubleDouble summations as
+ // expected.
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(3ll << 4ll) << BSON_ARRAY(5.5 << 6.6)
+ << BSON_ARRAY(Decimal128(7) << Decimal128(8))));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggDoubleDoubleSum"_sd,
+ BSON_ARRAY(BSON_ARRAY(1 << 2) << BSON_ARRAY(1 << 2 << 3ll << 4ll)
+ << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6)
+ << BSON_ARRAY(1 << 2 << 3ll << 4ll << 5.5 << 6.6
+ << Decimal128(7) << Decimal128(8))));
+ auto doubleDoubleSumExpr = compileAggExpression(*exprs[0], &_aggAccessor);
+ aggregateAndAssertResults(
+ inputTag, inputVal, expectedTag, expectedVal, doubleDoubleSumExpr.get());
+
+ // Now compile the second expression and make sure it computes a simple sum.
+ auto simpleSumExpr = compileAggExpression(*exprs[1], &_aggAccessor);
+
+ auto inputValues = BSON_ARRAY(5 << 8 << 0 << 4);
+ auto expectedAggStates = BSON_ARRAY(5 << 13 << 13 << 17);
+ aggregateAndAssertResults(inputValues, expectedAggStates, simpleSumExpr.get());
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsStdDevPop) {
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggStdDev"_sd,
+ BSON_ARRAY(BSON_ARRAY(5 << 10)
+ << BSON_ARRAY(6 << 8) << BSON_ARRAY("MISSING") << BSON_ARRAY(1 << 9 << 10)));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggStdDev"_sd,
+ BSON_ARRAY(BSON_ARRAY(5 << 10)
+ << BSON_ARRAY(5 << 10 << 6 << 8) << BSON_ARRAY(5 << 10 << 6 << 8)
+ << BSON_ARRAY(5 << 10 << 6 << 8 << 1 << 9 << 10)));
+
+ auto accStatement = makeAccumulationStatement("$stdDevPop"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+
+ // Feed the result back into the input accessor.
+ auto [resTag, resVal] = _aggAccessor.copyOrMoveValue();
+ _inputAccessor.reset(true, resTag, resVal);
+ auto finalizeExpr =
+ stage_builder::makeFunction("stdDevPopFinalize", stage_builder::makeVariable(_inputSlotId));
+ auto finalizeCode = compileExpression(*finalizeExpr);
+ auto [finalizedTag, finalizedRes] = runCompiledExpression(finalizeCode.get());
+ ASSERT_EQ(finalizedTag, sbe::value::TypeTags::NumberDouble);
+ ASSERT_APPROX_EQUAL(sbe::value::bitcastTo<double>(finalizedRes), 3.0237, 0.0001);
+}
+
+TEST_F(SbeStageBuilderGroupAggCombinerTest, CombinePartialAggsStdDevSamp) {
+ auto [inputTag, inputVal] = makePartialAggArray(
+ "aggStdDev"_sd,
+ BSON_ARRAY(BSON_ARRAY(5 << 10)
+ << BSON_ARRAY(6 << 8) << BSON_ARRAY("MISSING") << BSON_ARRAY(1 << 9 << 10)));
+ auto [expectedTag, expectedVal] = makePartialAggArray(
+ "aggStdDev"_sd,
+ BSON_ARRAY(BSON_ARRAY(5 << 10)
+ << BSON_ARRAY(5 << 10 << 6 << 8) << BSON_ARRAY(5 << 10 << 6 << 8)
+ << BSON_ARRAY(5 << 10 << 6 << 8 << 1 << 9 << 10)));
+
+ auto accStatement = makeAccumulationStatement("$stdDevSamp"_sd);
+ auto compiledExpr = compileSingleInputNoCollator(accStatement);
+ aggregateAndAssertResults(inputTag, inputVal, expectedTag, expectedVal, compiledExpr.get());
+
+ // Feed the result back into the input accessor.
+ auto [resTag, resVal] = _aggAccessor.copyOrMoveValue();
+ _inputAccessor.reset(true, resTag, resVal);
+ auto finalizeExpr = stage_builder::makeFunction("stdDevSampFinalize",
+ stage_builder::makeVariable(_inputSlotId));
+ auto finalizeCode = compileExpression(*finalizeExpr);
+ auto [finalizedTag, finalizedRes] = runCompiledExpression(finalizeCode.get());
+ ASSERT_EQ(finalizedTag, sbe::value::TypeTags::NumberDouble);
+ ASSERT_APPROX_EQUAL(sbe::value::bitcastTo<double>(finalizedRes), 3.2660, 0.0001);
+}
+
} // namespace mongo