summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2023-01-30 22:54:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-31 01:27:54 +0000
commit355aadd8ff9d5599da2983990f87c7ec300a972d (patch)
treeb75d91aea6c48c0e47edcff1c3a9ededde8c3354 /src/mongo
parent40c93f028e36f78c06756f4bfd358d240bdd9b34 (diff)
downloadmongo-355aadd8ff9d5599da2983990f87c7ec300a972d.tar.gz
SERVER-70395 Change spilling for SBE HashAggStage to use a more efficient algorithm
The new algorithm spills the entire hash table to a RecordStore whenever the memory budget is exceeded. Once all the input is consumed, it switches to a streaming approach, merging the partial aggregates recovered from disk.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/exec/sbe/abt/abt_lower.cpp10
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.cpp1
-rw-r--r--src/mongo/db/exec/sbe/expressions/expression.h46
-rw-r--r--src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp29
-rw-r--r--src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp203
-rw-r--r--src/mongo/db/exec/sbe/sbe_plan_size_test.cpp3
-rw-r--r--src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp48
-rw-r--r--src/mongo/db/exec/sbe/size_estimator.h5
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp519
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.h189
-rw-r--r--src/mongo/db/exec/sbe/stages/plan_stats.h8
-rw-r--r--src/mongo/db/exec/sbe/util/spilling.cpp1
-rw-r--r--src/mongo/db/exec/sbe/util/spilling.h6
-rw-r--r--src/mongo/db/exec/sbe/values/slot.cpp9
-rw-r--r--src/mongo/db/exec/sbe/values/slot.h16
-rw-r--r--src/mongo/db/exec/sbe/values/value_builder.h5
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.cpp39
-rw-r--r--src/mongo/db/exec/sbe/vm/vm.h3
-rw-r--r--src/mongo/db/query/sbe_stage_builder.cpp70
-rw-r--r--src/mongo/db/query/sbe_stage_builder_helpers.cpp12
-rw-r--r--src/mongo/db/query/sbe_stage_builder_helpers.h4
-rw-r--r--src/mongo/db/query/sbe_stage_builder_lookup.cpp18
22 files changed, 847 insertions, 397 deletions
diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp
index 17b13b26db1..72206e75512 100644
--- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp
+++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp
@@ -683,14 +683,15 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const GroupByNode& n,
tassert(6624227, "refs expected", refsAgg);
auto& exprs = refsAgg->nodes();
- sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs;
+ sbe::SlotExprPairVector aggs;
+ aggs.reserve(exprs.size());
for (size_t idx = 0; idx < exprs.size(); ++idx) {
auto expr = lowerExpression(exprs[idx]);
auto slot = _slotIdGenerator.generate();
mapProjToSlot(names[idx], slot);
- aggs.emplace(slot, std::move(expr));
+ aggs.push_back({slot, std::move(expr)});
}
boost::optional<sbe::value::SlotId> collatorSlot = _namedSlots.getSlotIfExists("collator"_sd);
@@ -705,6 +706,11 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const GroupByNode& n,
true /*optimizedClose*/,
collatorSlot,
false /*allowDiskUse*/,
+ // Since we are always disallowing disk use for this stage,
+ // we need not provide merging expressions. Once spilling
+ // is permitted here, we will need to generate merging
+ // expressions during lowering.
+ sbe::makeSlotExprPairVec() /*mergingExprs*/,
planNodeId);
}
diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp
index 77f69e8cbca..a96a559cd2d 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.cpp
+++ b/src/mongo/db/exec/sbe/expressions/expression.cpp
@@ -701,6 +701,7 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = {
BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collSetDifference, false}},
{"collSetEquals",
BuiltinFn{[](size_t n) { return n >= 3; }, vm::Builtin::collSetEquals, false}},
+ {"aggSetUnion", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggSetUnion, true}},
{"aggSetUnionCapped",
BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggSetUnionCapped, true}},
{"aggCollSetUnionCapped",
diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h
index ee0f997445f..433aa995146 100644
--- a/src/mongo/db/exec/sbe/expressions/expression.h
+++ b/src/mongo/db/exec/sbe/expressions/expression.h
@@ -127,6 +127,8 @@ protected:
}
};
+using SlotExprPairVector = std::vector<std::pair<value::SlotId, std::unique_ptr<EExpression>>>;
+
template <typename T, typename... Args>
inline std::unique_ptr<EExpression> makeE(Args&&... args) {
return std::make_unique<T>(std::forward<Args>(args)...);
@@ -143,20 +145,30 @@ inline auto makeEs(Ts&&... pack) {
namespace detail {
// base case
-inline void makeEM_unwind(value::SlotMap<std::unique_ptr<EExpression>>& result,
- value::SlotId slot,
- std::unique_ptr<EExpression> expr) {
- result.emplace(slot, std::move(expr));
+template <typename R>
+inline void makeSlotExprPairHelper(R& result,
+ value::SlotId slot,
+ std::unique_ptr<EExpression> expr) {
+ if constexpr (std::is_same_v<R, value::SlotMap<std::unique_ptr<EExpression>>>) {
+ result.emplace(slot, std::move(expr));
+ } else {
+ result.push_back({slot, std::move(expr)});
+ }
}
// recursive case
-template <typename... Ts>
-inline void makeEM_unwind(value::SlotMap<std::unique_ptr<EExpression>>& result,
- value::SlotId slot,
- std::unique_ptr<EExpression> expr,
- Ts&&... rest) {
- result.emplace(slot, std::move(expr));
- makeEM_unwind(result, std::forward<Ts>(rest)...);
+template <typename R, typename... Ts>
+inline void makeSlotExprPairHelper(R& result,
+ value::SlotId slot,
+ std::unique_ptr<EExpression> expr,
+ Ts&&... rest) {
+ if constexpr (std::is_same_v<R, value::SlotMap<std::unique_ptr<EExpression>>>) {
+ result.emplace(slot, std::move(expr));
+ } else {
+ static_assert(std::is_same_v<R, SlotExprPairVector>);
+ result.push_back({slot, std::move(expr)});
+ }
+ makeSlotExprPairHelper(result, std::forward<Ts>(rest)...);
}
} // namespace detail
@@ -165,7 +177,7 @@ auto makeEM(Ts&&... pack) {
value::SlotMap<std::unique_ptr<EExpression>> result;
if constexpr (sizeof...(pack) > 0) {
result.reserve(sizeof...(Ts) / 2);
- detail::makeEM_unwind(result, std::forward<Ts>(pack)...);
+ detail::makeSlotExprPairHelper(result, std::forward<Ts>(pack)...);
}
return result;
}
@@ -178,6 +190,16 @@ auto makeSV(Args&&... args) {
return v;
}
+template <typename... Ts>
+auto makeSlotExprPairVec(Ts&&... pack) {
+ SlotExprPairVector v;
+ if constexpr (sizeof...(pack) > 0) {
+ v.reserve(sizeof...(Ts) / 2);
+ detail::makeSlotExprPairHelper(v, std::forward<Ts>(pack)...);
+ }
+ return v;
+}
+
/**
* This is a constant expression. It assumes the ownership of the input constant.
*/
diff --git a/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp b/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp
index 74cd7dce343..60380c5438e 100644
--- a/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp
+++ b/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp
@@ -102,6 +102,35 @@ TEST_F(SBEBuiltinSetOpTest, ReturnsNothingSetUnion) {
runAndAssertNothing(compiledExpr.get());
}
+TEST_F(SBEBuiltinSetOpTest, AggSetUnion) {
+ value::OwnedValueAccessor aggAccessor, inputAccessor;
+ auto inputSlot = bindAccessor(&inputAccessor);
+ auto setUnionExpr =
+ stage_builder::makeFunction("aggSetUnion", stage_builder::makeVariable(inputSlot));
+ auto compiledExpr = compileAggExpression(*setUnionExpr, &aggAccessor);
+
+ auto [arrTag1, arrVal1] = makeArray(BSON_ARRAY(1 << 2));
+ inputAccessor.reset(arrTag1, arrVal1);
+ auto [resTag1, resVal1] = makeArraySet(BSON_ARRAY(1 << 2));
+ runAndAssertExpression(compiledExpr.get(), {resTag1, resVal1});
+ aggAccessor.reset(resTag1, resVal1);
+
+ auto [arrTag2, arrVal2] = makeArraySet(BSON_ARRAY(1 << 3 << 2 << 6));
+ inputAccessor.reset(arrTag2, arrVal2);
+ auto [resTag2, resVal2] = makeArraySet(BSON_ARRAY(1 << 2 << 3 << 6));
+ runAndAssertExpression(compiledExpr.get(), {resTag2, resVal2});
+ aggAccessor.reset(resTag2, resVal2);
+
+ auto [arrTag3, arrVal3] = makeArray(BSONArray{});
+ inputAccessor.reset(arrTag3, arrVal3);
+ auto [resTag3, resVal3] = makeArraySet(BSON_ARRAY(1 << 2 << 3 << 6));
+ runAndAssertExpression(compiledExpr.get(), {resTag3, resVal3});
+ aggAccessor.reset(resTag3, resVal3);
+
+ inputAccessor.reset(value::TypeTags::Nothing, 0);
+ runAndAssertNothing(compiledExpr.get());
+}
+
TEST_F(SBEBuiltinSetOpTest, ComputesSetIntersection) {
value::OwnedValueAccessor slotAccessor1, slotAccessor2;
auto arrSlot1 = bindAccessor(&slotAccessor1);
diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
index 603d71f08ea..b48ed496798 100644
--- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
@@ -82,18 +82,23 @@ void HashAggStageTest::performHashAggWithSpillChecking(
auto makeStageFn = [this, collatorSlot, shouldUseCollator, shouldSpill](
value::SlotId scanSlot, std::unique_ptr<PlanStage> scanStage) {
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction("sum",
- makeE<EConstant>(value::TypeTags::NumberInt64,
- value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction("sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(1)))),
makeSV(),
true,
boost::optional<value::SlotId>{shouldUseCollator, collatorSlot},
shouldSpill,
+ makeSlotExprPairVec(
+ spillSlot,
+ stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
return std::make_pair(countsSlot, std::move(hashAggStage));
@@ -179,20 +184,21 @@ TEST_F(HashAggStageTest, HashAggMinMaxTest) {
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(),
- makeEM(minSlot,
- stage_builder::makeFunction("min", makeE<EVariable>(scanSlot)),
- maxSlot,
- stage_builder::makeFunction("max", makeE<EVariable>(scanSlot)),
- collMinSlot,
- stage_builder::makeFunction(
- "collMin", collExpr->clone(), makeE<EVariable>(scanSlot)),
- collMaxSlot,
- stage_builder::makeFunction(
- "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))),
+ makeSlotExprPairVec(minSlot,
+ stage_builder::makeFunction("min", makeE<EVariable>(scanSlot)),
+ maxSlot,
+ stage_builder::makeFunction("max", makeE<EVariable>(scanSlot)),
+ collMinSlot,
+ stage_builder::makeFunction(
+ "collMin", collExpr->clone(), makeE<EVariable>(scanSlot)),
+ collMaxSlot,
+ stage_builder::makeFunction(
+ "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))),
makeSV(),
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec() /* mergingExprs */,
kEmptyPlanNodeId);
auto outSlot = generateSlotId();
@@ -243,13 +249,15 @@ TEST_F(HashAggStageTest, HashAggAddToSetTest) {
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(),
- makeEM(hashAggSlot,
- stage_builder::makeFunction(
- "collAddToSet", std::move(collExpr), makeE<EVariable>(scanSlot))),
+ makeSlotExprPairVec(hashAggSlot,
+ stage_builder::makeFunction("collAddToSet",
+ std::move(collExpr),
+ makeE<EVariable>(scanSlot))),
makeSV(),
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec() /* mergingExprs */,
kEmptyPlanNodeId);
return std::make_pair(hashAggSlot, std::move(hashAggStage));
@@ -340,14 +348,16 @@ TEST_F(HashAggStageTest, HashAggSeekKeysTest) {
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction("sum",
- makeE<EConstant>(value::TypeTags::NumberInt64,
- value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction("sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(1)))),
makeSV(seekSlot),
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec() /* mergingExprs */,
kEmptyPlanNodeId);
return std::make_pair(countsSlot, std::move(hashAggStage));
@@ -400,17 +410,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
makeSV(), // Seek slot
true,
boost::none,
true /* allowDiskUse */,
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -433,6 +447,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) {
// Check that the spilling behavior matches the expected.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_FALSE(stats->usedDisk);
+ ASSERT_EQ(0, stats->numSpills);
ASSERT_EQ(0, stats->spilledRecords);
stage->close();
@@ -441,7 +456,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) {
TEST_F(HashAggStageTest, HashAggBasicCountSpill) {
// We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to
// 64B so that exactly one row fits in memory.
- const int expectedRowsToFitInMemory = 1;
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
@@ -459,17 +473,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
makeSV(), // Seek slot
true,
boost::none,
true /* allowDiskUse */,
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -492,7 +510,14 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) {
// Check that the spilling behavior matches the expected.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
+ // Memory usage is estimated only every two rows at the most frequent. Also, we only start
+ // spilling after estimating that the memory budget is exceeded. These two factors result in
+ // fewer expected spills than there are input records, even though only one record fits in
+ // memory at a time.
+ ASSERT_EQ(stats->numSpills, 3);
+ // The input has one run of two consecutive values, so we expect to spill as many records as
+ // there are input values minus one.
+ ASSERT_EQ(stats->spilledRecords, 8);
stage->close();
}
@@ -526,17 +551,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
makeSV(), // Seek slot
true,
boost::none,
true /* allowDiskUse */,
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -559,6 +588,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) {
// Check that it did not spill.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_FALSE(stats->usedDisk);
+ ASSERT_EQ(0, stats->numSpills);
ASSERT_EQ(0, stats->spilledRecords);
stage->close();
@@ -567,7 +597,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) {
TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
// We estimate the size of result row like {double, int64} at 50B. Set the memory threshold to
// 64B so that exactly one row fits in memory.
- const int expectedRowsToFitInMemory = 1;
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
@@ -585,17 +614,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
makeSV(), // Seek slot
true,
boost::none,
true /* allowDiskUse */,
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -618,7 +651,14 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
// Check that the spilling behavior matches the expected.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
+ // Memory usage is estimated only every two rows at the most frequent. Also, we only start
+ // spilling after estimating that the memory budget is exceeded. These two factors result in
+ // fewer expected spills than there are input records, even though only one record fits in
+ // memory at a time.
+ ASSERT_EQ(stats->numSpills, 3);
+ // The input has one run of two consecutive values, so we expect to spill as many records as
+ // there are input values minus one.
+ ASSERT_EQ(stats->spilledRecords, 8);
stage->close();
}
@@ -640,17 +680,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) {
// Build a HashAggStage, with an empty group by slot and compute a simple count.
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
makeSV(), // Seek slot
true,
boost::none,
true /* allowDiskUse */,
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -671,6 +715,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) {
// Check that the spilling behavior matches the expected.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_FALSE(stats->usedDisk);
+ ASSERT_EQ(0, stats->numSpills);
ASSERT_EQ(0, stats->spilledRecords);
stage->close();
@@ -679,7 +724,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) {
TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
// We estimate the size of result row like {double, int64} at 59B. Set the memory threshold to
// 128B so that two rows fit in memory.
- const int expectedRowsToFitInMemory = 2;
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128);
@@ -698,19 +742,27 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
auto sumsSlot = generateSlotId();
+ auto spillSlot1 = generateSlotId();
+ auto spillSlot2 = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))),
- sumsSlot,
- stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))),
+ sumsSlot,
+ stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))),
makeSV(), // Seek slot
true,
boost::none,
true /* allowDiskUse */,
+ makeSlotExprPairVec(
+ spillSlot1,
+ stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot1)),
+ spillSlot2,
+ stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot2))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -740,7 +792,10 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
// Check that the spilling behavior matches the expected.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
+ ASSERT_EQ(stats->numSpills, 3);
+ // The input has one run of two consecutive values, so we expect to spill as many records as
+ // there are input values minus one.
+ ASSERT_EQ(stats->spilledRecords, 8);
stage->close();
}
@@ -765,19 +820,27 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
auto sumsSlot = generateSlotId();
+ auto spillSlot1 = generateSlotId();
+ auto spillSlot2 = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))),
- sumsSlot,
- stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))),
+ sumsSlot,
+ stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))),
makeSV(), // Seek slot
true,
boost::none,
true, // allowDiskUse=true
+ makeSlotExprPairVec(
+ spillSlot1,
+ stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot1)),
+ spillSlot2,
+ stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot2))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -807,7 +870,9 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
// Check that the spilling behavior matches the expected.
auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size(), stats->spilledRecords);
+ // We expect each incoming value to result in a spill of a single record.
+ ASSERT_EQ(stats->numSpills, 9);
+ ASSERT_EQ(stats->spilledRecords, 9);
stage->close();
}
@@ -844,14 +909,18 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) {
// Build a HashAggStage, group by the scanSlot and compute a sum for each group.
auto sumsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(sumsSlot, stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))),
+ makeSlotExprPairVec(sumsSlot,
+ stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))),
makeSV(), // Seek slot
true,
boost::none,
true, // allowDiskUse=true
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
@@ -885,17 +954,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) {
// Build a HashAggStage, group by the scanSlot and compute a simple count.
auto countsSlot = generateSlotId();
+ auto spillSlot = generateSlotId();
auto stage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
makeSV(), // Seek slot
true,
boost::none,
true, // allowDiskUse=true
+ makeSlotExprPairVec(
+ spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))),
kEmptyPlanNodeId);
// Prepare the tree and get the 'SlotAccessor' for the output slot.
diff --git a/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp b/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp
index 6446ab57845..ecd22c9df81 100644
--- a/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp
@@ -129,11 +129,12 @@ TEST_F(PlanSizeTest, Filter) {
TEST_F(PlanSizeTest, HashAgg) {
auto stage = makeS<HashAggStage>(mockS(),
mockSV(),
- makeEM(generateSlotId(), mockE()),
+ makeSlotExprPairVec(generateSlotId(), mockE()),
makeSV(),
true,
generateSlotId(),
false,
+ makeSlotExprPairVec(),
kEmptyPlanNodeId);
assertPlanSize(*stage);
}
diff --git a/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp b/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp
index 06097b3a3b2..40f9ca9a9e6 100644
--- a/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp
@@ -141,14 +141,16 @@ TEST_F(TrialRunTrackerTest, TrialEndsDuringOpenPhaseOfBlockingStage) {
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
- makeSV(), // Seek slot
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSV(), /* Seek slot */
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec(), /* mergingExprs */
kEmptyPlanNodeId);
auto tracker = std::make_unique<TrialRunTracker>(numResultsLimit, size_t{0});
@@ -210,14 +212,16 @@ TEST_F(TrialRunTrackerTest, OnlyDeepestNestedBlockingStageHasTrialRunTracker) {
auto hashAggStage = makeS<HashAggStage>(
std::move(unionStage),
makeSV(unionSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
- makeSV(), // Seek slot
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSV(), /* Seek slot */
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec(), /* mergingExprs */
kEmptyPlanNodeId);
hashAggStage->prepare(*ctx);
@@ -277,14 +281,16 @@ TEST_F(TrialRunTrackerTest, SiblingBlockingStagesBothGetTrialRunTracker) {
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction("sum",
- makeE<EConstant>(value::TypeTags::NumberInt64,
- value::bitcastFrom<int64_t>(1)))),
- makeSV(), // Seek slot
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction("sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(1)))),
+ makeSV(), /* Seek slot */
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec(), /* mergingExprs */
kEmptyPlanNodeId);
return std::make_pair(countsSlot, std::move(hashAggStage));
@@ -407,14 +413,16 @@ TEST_F(TrialRunTrackerTest, DisablingTrackingForAChildStagePreventsEarlyExit) {
auto hashAggStage = makeS<HashAggStage>(
std::move(scanStage),
makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction("sum",
- makeE<EConstant>(value::TypeTags::NumberInt64,
- value::bitcastFrom<int64_t>(1)))),
- makeSV(), // Seek slot
+ makeSlotExprPairVec(
+ countsSlot,
+ stage_builder::makeFunction("sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64,
+ value::bitcastFrom<int64_t>(1)))),
+ makeSV(), /* Seek slot */
true,
boost::none,
false /* allowDiskUse */,
+ makeSlotExprPairVec(), /* mergingExprs */
kEmptyPlanNodeId);
return std::make_pair(countsSlot, std::move(hashAggStage));
diff --git a/src/mongo/db/exec/sbe/size_estimator.h b/src/mongo/db/exec/sbe/size_estimator.h
index fb6684eea52..bbb92328331 100644
--- a/src/mongo/db/exec/sbe/size_estimator.h
+++ b/src/mongo/db/exec/sbe/size_estimator.h
@@ -92,6 +92,11 @@ inline size_t estimate(const S& stats) {
return stats.estimateObjectSizeInBytes() - sizeof(S);
}
+template <typename A, typename B>
+inline size_t estimate(const std::pair<A, B>& pair) {
+ return estimate(pair.first) + estimate(pair.second);
+}
+
// Calculate the size of the inlined vector's elements.
template <typename T, size_t N, typename A>
size_t estimate(const absl::InlinedVector<T, N, A>& vector) {
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index 74b7625ede6..67c1b46d8cb 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -43,32 +43,54 @@ namespace mongo {
namespace sbe {
HashAggStage::HashAggStage(std::unique_ptr<PlanStage> input,
value::SlotVector gbs,
- value::SlotMap<std::unique_ptr<EExpression>> aggs,
+ SlotExprPairVector aggs,
value::SlotVector seekKeysSlots,
bool optimizedClose,
boost::optional<value::SlotId> collatorSlot,
bool allowDiskUse,
+ SlotExprPairVector mergingExprs,
PlanNodeId planNodeId,
- bool participateInTrialRunTracking)
+ bool participateInTrialRunTracking,
+ bool forceIncreasedSpilling)
: PlanStage("group"_sd, planNodeId, participateInTrialRunTracking),
_gbs(std::move(gbs)),
_aggs(std::move(aggs)),
_collatorSlot(collatorSlot),
_allowDiskUse(allowDiskUse),
_seekKeysSlots(std::move(seekKeysSlots)),
- _optimizedClose(optimizedClose) {
+ _optimizedClose(optimizedClose),
+ _mergingExprs(std::move(mergingExprs)),
+ _forceIncreasedSpilling(forceIncreasedSpilling) {
_children.emplace_back(std::move(input));
invariant(_seekKeysSlots.empty() || _seekKeysSlots.size() == _gbs.size());
tassert(5843100,
"HashAgg stage was given optimizedClose=false and seek keys",
_seekKeysSlots.empty() || _optimizedClose);
+
+ if (_allowDiskUse) {
+ tassert(7039549,
+ "disk use enabled for HashAggStage but incorrect number of merging expresssions",
+ _aggs.size() == _mergingExprs.size());
+ }
+
+ if (_forceIncreasedSpilling) {
+ tassert(7039554, "'forceIncreasedSpilling' set but disk use not allowed", _allowDiskUse);
+ }
}
std::unique_ptr<PlanStage> HashAggStage::clone() const {
- value::SlotMap<std::unique_ptr<EExpression>> aggs;
+ SlotExprPairVector aggs;
+ aggs.reserve(_aggs.size());
for (auto& [k, v] : _aggs) {
- aggs.emplace(k, v->clone());
+ aggs.push_back({k, v->clone()});
}
+
+ SlotExprPairVector mergingExprsClone;
+ mergingExprsClone.reserve(_mergingExprs.size());
+ for (auto&& [k, v] : _mergingExprs) {
+ mergingExprsClone.push_back({k, v->clone()});
+ }
+
return std::make_unique<HashAggStage>(_children[0]->clone(),
_gbs,
std::move(aggs),
@@ -76,8 +98,10 @@ std::unique_ptr<PlanStage> HashAggStage::clone() const {
_optimizedClose,
_collatorSlot,
_allowDiskUse,
+ std::move(mergingExprsClone),
_commonStats.nodeId,
- _participateInTrialRunTracking);
+ _participateInTrialRunTracking,
+ _forceIncreasedSpilling);
}
void HashAggStage::doSaveState(bool relinquishCursor) {
@@ -122,34 +146,36 @@ void HashAggStage::prepare(CompileCtx& ctx) {
}
value::SlotSet dupCheck;
+ auto throwIfDupSlot = [&dupCheck](value::SlotId slot) {
+ auto [_, inserted] = dupCheck.emplace(slot);
+ tassert(7039551, "duplicate slot id", inserted);
+ };
+
size_t counter = 0;
// Process group by columns.
for (auto& slot : _gbs) {
- auto [it, inserted] = dupCheck.emplace(slot);
- uassert(4822827, str::stream() << "duplicate field: " << slot, inserted);
+ throwIfDupSlot(slot);
_inKeyAccessors.emplace_back(_children[0]->getAccessor(ctx, slot));
- // Construct accessors for the key to be processed from either the '_ht' or the
- // '_recordStore'. Before the memory limit is reached the '_outHashKeyAccessors' will carry
- // the group-by keys, otherwise the '_outRecordStoreKeyAccessors' will carry the group-by
- // keys.
+ // Construct accessors for obtaining the key values from either the hash table '_ht' or the
+ // '_recordStore'.
_outHashKeyAccessors.emplace_back(std::make_unique<HashKeyAccessor>(_htIt, counter));
_outRecordStoreKeyAccessors.emplace_back(
- std::make_unique<value::MaterializedSingleRowAccessor>(_aggKeyRecordStore, counter));
+ std::make_unique<value::MaterializedSingleRowAccessor>(_outKeyRowRecordStore, counter));
- counter++;
-
- // A SwitchAccessor is used to point the '_outKeyAccessors' to the key coming from the '_ht'
- // or the '_recordStore' when draining the HashAgg stage in getNext. The group-by key will
- // either be in the '_ht' or the '_recordStore' if the key lives in memory, or if the key
- // has been spilled to disk, respectively. The SwitchAccessor allows toggling between the
- // two so the parent stage can read it through the '_outAccessors'.
+ // A 'SwitchAccessor' is used to point the '_outKeyAccessors' to the key coming from the
+ // '_ht' or the '_recordStore' when draining the HashAgg stage in getNext(). If no spilling
+ // occurred, the keys will be obtained from the hash table. If spilling kicked in, then all
+ // of the data is written out to the record store, so the 'SwitchAccessor' is reconfigured
+ // to obtain all of the keys from the spill table.
_outKeyAccessors.emplace_back(
std::make_unique<value::SwitchAccessor>(std::vector<value::SlotAccessor*>{
_outHashKeyAccessors.back().get(), _outRecordStoreKeyAccessors.back().get()}));
_outAccessors[slot] = _outKeyAccessors.back().get();
+
+ ++counter;
}
// Process seek keys (if any). The keys must come from outside of the subtree (by definition) so
@@ -160,26 +186,18 @@ void HashAggStage::prepare(CompileCtx& ctx) {
counter = 0;
for (auto& [slot, expr] : _aggs) {
- auto [it, inserted] = dupCheck.emplace(slot);
- // Some compilers do not allow to capture local bindings by lambda functions (the one
- // is used implicitly in uassert below), so we need a local variable to construct an
- // error message.
- const auto slotId = slot;
- uassert(4822828, str::stream() << "duplicate field: " << slotId, inserted);
-
- // Construct accessors for the agg state to be processed from either the '_ht' or the
- // '_recordStore' by the SwitchAccessor owned by '_outAggAccessors' below.
+ throwIfDupSlot(slot);
+
+ // Just like with the output accessors for the keys, we construct output accessors for the
+ // aggregate values that read from either the hash table '_ht' or the '_recordStore'.
_outRecordStoreAggAccessors.emplace_back(
- std::make_unique<value::MaterializedSingleRowAccessor>(_aggValueRecordStore, counter));
+ std::make_unique<value::MaterializedSingleRowAccessor>(_outAggRowRecordStore, counter));
_outHashAggAccessors.emplace_back(std::make_unique<HashAggAccessor>(_htIt, counter));
- counter++;
-
- // A SwitchAccessor is used to toggle the '_outAggAccessors' between the '_ht' and the
- // '_recordStore' when updating the agg state via the bytecode. By compiling the agg
- // EExpressions with a SwitchAccessor we can load the agg value into the memory of
- // '_aggValueRecordStore' if the value comes from the '_recordStore' or we can use the
- // agg value referenced through '_htIt' and run the bytecode to mutate the value through the
- // SwitchAccessor.
+
+ // A 'SwitchAccessor' is used to toggle the '_outAggAccessors' between the '_ht' and the
+ // '_recordStore'. Just like the key values, the aggregate values are always obtained from
+ // the hash table if no spilling occurred and are always obtained from the record store if
+ // spilling occurred.
_outAggAccessors.emplace_back(
std::make_unique<value::SwitchAccessor>(std::vector<value::SlotAccessor*>{
_outHashAggAccessors.back().get(), _outRecordStoreAggAccessors.back().get()}));
@@ -189,10 +207,32 @@ void HashAggStage::prepare(CompileCtx& ctx) {
ctx.root = this;
ctx.aggExpression = true;
ctx.accumulator = _outAggAccessors.back().get();
-
_aggCodes.emplace_back(expr->compile(ctx));
ctx.aggExpression = false;
+
+ ++counter;
+ }
+
+ // If disk use is allowed, then we need to compile the merging expressions as well.
+ if (_allowDiskUse) {
+ counter = 0;
+ for (auto&& [spillSlot, mergingExpr] : _mergingExprs) {
+ throwIfDupSlot(spillSlot);
+
+ _spilledAggsAccessors.push_back(
+ std::make_unique<value::MaterializedSingleRowAccessor>(_spilledAggRow, counter));
+ _spilledAggsAccessorMap[spillSlot] = _spilledAggsAccessors.back().get();
+
+ ctx.root = this;
+ ctx.aggExpression = true;
+ ctx.accumulator = _outAggAccessors[counter].get();
+ _mergingExprCodes.emplace_back(mergingExpr->compile(ctx));
+ ctx.aggExpression = false;
+
+ ++counter;
+ }
}
+
_compiled = true;
}
@@ -202,6 +242,15 @@ value::SlotAccessor* HashAggStage::getAccessor(CompileCtx& ctx, value::SlotId sl
return it->second;
}
} else {
+ // The slots into which we read spilled partial aggregates, accessible via
+ // '_spilledAggsAccessors', should only be visible to this stage. They are used internally
+ // when merging spilled partial aggregates and should never be read by ancestor stages.
+ // Therefore, they are only made visible when this stage is in the process of compiling
+ // itself.
+ if (auto it = _spilledAggsAccessorMap.find(slot); it != _spilledAggsAccessorMap.end()) {
+ return it->second;
+ }
+
return _children[0]->getAccessor(ctx, slot);
}
@@ -227,89 +276,82 @@ void HashAggStage::spillRowToDisk(const value::MaterializedRow& key,
const value::MaterializedRow& val) {
KeyString::Builder kb{KeyString::Version::kLatestVersion};
key.serializeIntoKeyString(kb);
+ // Add a unique integer to the end of the key, since record ids must be unique. We want equal
+ // keys to be adjacent in the 'RecordStore' so that we can merge the partial aggregates with a
+ // single pass.
+ kb.appendNumberLong(_ridCounter++);
auto typeBits = kb.getTypeBits();
-
auto rid = RecordId(kb.getBuffer(), kb.getSize());
- boost::optional<value::MaterializedRow> valFromRs =
- readFromRecordStore(_opCtx, _recordStore->rs(), rid);
- tassert(6031100, "Spilling a row doesn't support updating it in the store.", !valFromRs);
- spillValueToDisk(rid, val, typeBits, false /*update*/);
+ upsertToRecordStore(_opCtx, _recordStore->rs(), rid, val, typeBits, false /*update*/);
+ _specificStats.spilledRecords++;
}
-void HashAggStage::spillValueToDisk(const RecordId& key,
- const value::MaterializedRow& val,
- const KeyString::TypeBits& typeBits,
- bool update) {
- auto nBytes = upsertToRecordStore(_opCtx, _recordStore->rs(), key, val, typeBits, update);
- if (!update) {
- _specificStats.spilledRecords++;
+void HashAggStage::spill(MemoryCheckData& mcd) {
+ uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
+ "Exceeded memory limit for $group, but didn't allow external spilling;"
+ " pass allowDiskUse:true to opt in",
+ _allowDiskUse);
+
+ // Since we flush the entire hash table to disk, we also clear any state related to estimating
+ // memory consumption.
+ mcd.reset();
+
+ if (!_recordStore) {
+ makeTemporaryRecordStore();
}
- _specificStats.lastSpilledRecordSize = nBytes;
+
+ for (auto&& it : *_ht) {
+ spillRowToDisk(it.first, it.second);
+ }
+ _ht->clear();
+
+ ++_specificStats.numSpills;
}
// Checks memory usage. Ideally, we'd want to know the exact size of already accumulated data, but
// we cannot, so we estimate it based on the last updated/inserted row, if we have one, or the first
// row in the '_ht' table. If the estimated memory usage exceeds the allowed, this method initiates
-// spilling (if haven't been done yet) and evicts some records from the '_ht' table into the temp
-// store to keep the memory usage under the limit.
+// spilling.
void HashAggStage::checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd) {
- // The '_ht' table might become empty in the degenerate case when all rows had to be evicted to
- // meet the memory constraint during a previous check -- we don't need to keep checking memory
- // usage in this case because the table will never get new rows.
- if (_ht->empty()) {
- return;
- }
+ invariant(!_ht->empty());
// If the group-by key is empty we will only ever aggregate into a single row so no sense in
- // spilling since we will just be moving a single row back and forth from disk to main memory.
+ // spilling.
if (_inKeyAccessors.size() == 0) {
return;
}
mcd.memoryCheckpointCounter++;
- if (mcd.memoryCheckpointCounter >= mcd.nextMemoryCheckpoint) {
- if (_htIt == _ht->end()) {
- _htIt = _ht->begin();
- }
- const long estimatedRowSize =
- _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter();
- long long estimatedTotalSize = _ht->size() * estimatedRowSize;
+ if (mcd.memoryCheckpointCounter < mcd.nextMemoryCheckpoint) {
+ // We haven't reached the next checkpoint at which we estimate memory usage and decide if we
+ // should spill.
+ return;
+ }
+
+ const long estimatedRowSize =
+ _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter();
+ long long estimatedTotalSize = _ht->size() * estimatedRowSize;
+
+ if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) {
+ spill(mcd);
+ } else {
+ // Calculate the next memory checkpoint. We estimate it based on the prior growth of the
+ // '_ht' and the remaining available memory. If 'estimatedGainPerChildAdvance' suggests that
+ // the hash table is growing, then the checkpoint is estimated as some configurable
+ // percentage of the number of additional input rows that we would have to process to
+ // consume the remaining memory. On the other hand, a value of 'estimtedGainPerChildAdvance'
+ // close to zero indicates a stable hash stable size, in which case we can delay the next
+ // check progressively.
const double estimatedGainPerChildAdvance =
(static_cast<double>(estimatedTotalSize - mcd.lastEstimatedMemoryUsage) /
mcd.memoryCheckpointCounter);
- if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) {
- uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- "Exceeded memory limit for $group, but didn't allow external spilling."
- " Pass allowDiskUse:true to opt in.",
- _allowDiskUse);
- if (!_recordStore) {
- makeTemporaryRecordStore();
- }
-
- // Evict enough rows into the temporary store to drop below the memory constraint.
- const long rowsToEvictCount =
- 1 + (estimatedTotalSize - _approxMemoryUseInBytesBeforeSpill) / estimatedRowSize;
- for (long i = 0; !_ht->empty() && i < rowsToEvictCount; i++) {
- spillRowToDisk(_htIt->first, _htIt->second);
- _ht->erase(_htIt);
- _htIt = _ht->begin();
- }
- estimatedTotalSize = _ht->size() * estimatedRowSize;
- }
-
- // Calculate the next memory checkpoint. We estimate it based on the prior growth of the
- // '_ht' and the remaining available memory. We have to keep doing this even after starting
- // to spill because some accumulators can grow in size inside '_ht' (with no bounds).
- // Value of 'estimatedGainPerChildAdvance' can be negative if the previous checkpoint
- // evicted any records. And a value close to zero indicates a stable size of '_ht' so can
- // delay the next check progressively.
const long nextCheckpointCandidate = (estimatedGainPerChildAdvance > 0.1)
? mcd.checkpointMargin * (_approxMemoryUseInBytesBeforeSpill - estimatedTotalSize) /
estimatedGainPerChildAdvance
- : (estimatedGainPerChildAdvance < -0.1) ? mcd.atMostCheckFrequency
- : mcd.nextMemoryCheckpoint * 2;
+ : mcd.nextMemoryCheckpoint * 2;
+
mcd.nextMemoryCheckpoint =
std::min<long>(mcd.memoryCheckFrequency,
std::max<long>(mcd.atMostCheckFrequency, nextCheckpointCandidate));
@@ -335,16 +377,28 @@ void HashAggStage::open(bool reOpen) {
5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator);
auto collatorView = value::getCollatorView(collatorVal);
const value::MaterializedRowHasher hasher(collatorView);
- const value::MaterializedRowEq equator(collatorView);
- _ht.emplace(0, hasher, equator);
+ _keyEq = value::MaterializedRowEq(collatorView);
+ _ht.emplace(0, hasher, _keyEq);
} else {
_ht.emplace();
}
_seekKeys.resize(_seekKeysAccessors.size());
- // A default value for spilling a key to the record store.
- value::MaterializedRow defaultVal{_outAggAccessors.size()};
+ // Reset state since this stage may have been previously opened.
+ for (auto&& accessor : _outKeyAccessors) {
+ accessor->setIndex(0);
+ }
+ for (auto&& accessor : _outAggAccessors) {
+ accessor->setIndex(0);
+ }
+ _rsCursor.reset();
+ _recordStore.reset();
+ _outKeyRowRecordStore = {0};
+ _outAggRowRecordStore = {0};
+ _spilledAggRow = {0};
+ _stashedNextRow = {0, 0};
+
MemoryCheckData memoryCheckData;
while (_children[0]->getNext() == PlanState::ADVANCED) {
@@ -356,53 +410,35 @@ void HashAggStage::open(bool reOpen) {
key.reset(idx++, false, tag, val);
}
+ bool newKey = false;
+ _htIt = _ht->find(key);
+ if (_htIt == _ht->end()) {
+ // The key is not present in the hash table yet, so we insert it and initialize the
+ // corresponding accumulator. Note that as a future optimization, we could avoid
+ // doing a lookup both in the 'find()' call and in 'emplace()'.
+ newKey = true;
+ key.makeOwned();
+ auto [it, _] = _ht->emplace(std::move(key), value::MaterializedRow{0});
+ it->second.resize(_outAggAccessors.size());
+ _htIt = it;
+ }
+
+ // Accumulate state in '_ht'.
+ for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) {
+ auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get());
+ _outHashAggAccessors[idx]->reset(owned, tag, val);
+ }
- if (_htIt = _ht->find(key); !_recordStore || _htIt != _ht->end()) {
- if (_htIt == _ht->end()) {
- // The memory limit hasn't been reached yet, insert a new key in '_ht' by
- // copying the key. Note as a future optimization, we should avoid the lookup in
- // the find() call and the emplace.
- key.makeOwned();
- auto [it, _] = _ht->emplace(std::move(key), value::MaterializedRow{0});
- // Initialize accumulators.
- it->second.resize(_outAggAccessors.size());
- _htIt = it;
- }
- // Accumulate state in '_ht' by pointing the '_outAggAccessors' the
- // '_outHashAggAccessors'.
- for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) {
- _outAggAccessors[idx]->setIndex(0);
- auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get());
- _outHashAggAccessors[idx]->reset(owned, tag, val);
- }
+ if (_forceIncreasedSpilling && !newKey) {
+ // If configured to spill more than usual, we spill after seeing the same key twice.
+ spill(memoryCheckData);
} else {
- // The memory limit has been reached and the key wasn't in the '_ht' so we need
- // to spill it to the '_recordStore'.
- KeyString::Builder kb{KeyString::Version::kLatestVersion};
- key.serializeIntoKeyString(kb);
- auto typeBits = kb.getTypeBits();
-
- auto rid = RecordId(kb.getBuffer(), kb.getSize());
-
- boost::optional<value::MaterializedRow> valFromRs =
- readFromRecordStore(_opCtx, _recordStore->rs(), rid);
- if (!valFromRs) {
- _aggValueRecordStore = defaultVal;
- } else {
- _aggValueRecordStore = *valFromRs;
- }
-
- for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) {
- _outAggAccessors[idx]->setIndex(1);
- auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get());
- _aggValueRecordStore.reset(idx, owned, tag, val);
- }
- spillValueToDisk(rid, _aggValueRecordStore, typeBits, valFromRs ? true : false);
+ // Estimates how much memory is being used. If we estimate that the hash table
+ // exceeds the allotted memory budget, its contents are spilled to the
+ // '_recordStore' and '_ht' is cleared.
+ checkMemoryUsageAndSpillIfNecessary(memoryCheckData);
}
- // Estimates how much memory is being used and might start spilling.
- checkMemoryUsageAndSpillIfNecessary(memoryCheckData);
-
if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) {
// During trial runs, we want to limit the amount of work done by opening a blocking
// stage, like this one. The blocking stage tracks the number of documents it has
@@ -418,8 +454,33 @@ void HashAggStage::open(bool reOpen) {
_children[0]->close();
_childOpened = false;
}
- }
+ // If we spilled at any point while consuming the input, then do one final spill to write
+ // any leftover contents of '_ht' to the record store. That way, when recovering the input
+ // from the record store and merging partial aggregates we don't have to worry about the
+ // possibility of some of the data being in the hash table and some being in the record
+ // store.
+ if (_recordStore) {
+ if (!_ht->empty()) {
+ spill(memoryCheckData);
+ }
+
+ _specificStats.spilledDataStorageSize = _recordStore->rs()->storageSize(_opCtx);
+
+ // Establish a cursor, positioned at the beginning of the record store.
+ _rsCursor = _recordStore->rs()->getCursor(_opCtx);
+
+ // Callers will be obtaining the results from the spill table, so set the
+ // 'SwitchAccessors' so that they refer to the rows recovered from the record store
+ // under the hood.
+ for (auto&& accessor : _outKeyAccessors) {
+ accessor->setIndex(1);
+ }
+ for (auto&& accessor : _outAggAccessors) {
+ accessor->setIndex(1);
+ }
+ }
+ }
if (!_seekKeysAccessors.empty()) {
// Copy keys in order to do the lookup.
@@ -431,20 +492,76 @@ void HashAggStage::open(bool reOpen) {
}
_htIt = _ht->end();
+}
- // Set the SwitchAccessors to point to the '_ht' so we can drain it first before draining the
- // '_recordStore' in getNext().
- for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) {
- _outAggAccessors[idx]->setIndex(0);
+HashAggStage::SpilledRow HashAggStage::deserializeSpilledRecord(const Record& record,
+ BufBuilder& keyBuffer) {
+ // Read the values and type bits out of the value part of the record.
+ BufReader valReader(record.data.data(), record.data.size());
+ auto val = value::MaterializedRow::deserializeForSorter(valReader, {});
+ auto typeBits = KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader);
+
+ keyBuffer.reset();
+ auto key = value::MaterializedRow::deserializeFromKeyString(
+ decodeKeyString(record.id, typeBits), &keyBuffer, _gbs.size() /*numPrefixValuesToRead*/);
+ return {std::move(key), std::move(val)};
+}
+
+PlanState HashAggStage::getNextSpilled() {
+ if (_stashedNextRow.first.isEmpty()) {
+ auto nextRecord = _rsCursor->next();
+ if (!nextRecord) {
+ return trackPlanState(PlanState::IS_EOF);
+ }
+
+ // We are just starting the process of merging the spilled file segments.
+ auto recoveredRow = deserializeSpilledRecord(*nextRecord, _outKeyRowRSBuffer);
+
+ _outKeyRowRecordStore = std::move(recoveredRow.first);
+ _outAggRowRecordStore = std::move(recoveredRow.second);
+ } else {
+ // We peeked at the next key last time around.
+ _outKeyRowRSBuffer = std::move(_stashedKeyBuffer);
+ _outKeyRowRecordStore = std::move(_stashedNextRow.first);
+ _outAggRowRecordStore = std::move(_stashedNextRow.second);
+ // Clear the stashed row.
+ _stashedNextRow = {0, 0};
}
- _drainingRecordStore = false;
+
+ // Find additional partial aggregates for the same key and merge them in order to compute the
+ // final output.
+ for (auto nextRecord = _rsCursor->next(); nextRecord; nextRecord = _rsCursor->next()) {
+ auto recoveredRow = deserializeSpilledRecord(*nextRecord, _stashedKeyBuffer);
+ if (!_keyEq(recoveredRow.first, _outKeyRowRecordStore)) {
+ // The newly recovered spilled row belongs to a new key, so we're done merging partial
+ // aggregates for the old key. Save the new row for later and return advanced.
+ _stashedNextRow = std::move(recoveredRow);
+ return trackPlanState(PlanState::ADVANCED);
+ }
+
+ // Merge in the new partial aggregate values.
+ _spilledAggRow = std::move(recoveredRow.second);
+ for (size_t idx = 0; idx < _mergingExprCodes.size(); ++idx) {
+ auto [owned, tag, val] = _bytecode.run(_mergingExprCodes[idx].get());
+ _outRecordStoreAggAccessors[idx]->reset(owned, tag, val);
+ }
+ }
+
+ return trackPlanState(PlanState::ADVANCED);
}
PlanState HashAggStage::getNext() {
auto optTimer(getOptTimer(_opCtx));
- if (_htIt == _ht->end() && !_drainingRecordStore) {
- // First invocation of getNext() after open() when not draining the '_recordStore'.
+ // If we've spilled, then we need to produce the output by merging the spilled segments from the
+ // spill file.
+ if (_recordStore) {
+ return getNextSpilled();
+ }
+
+ // We didn't spill. Obtain the next output row from the hash table.
+ if (_htIt == _ht->end()) {
+ // First invocation of getNext() after open().
if (!_seekKeysAccessors.empty()) {
_htIt = _ht->find(_seekKeys);
} else {
@@ -453,55 +570,13 @@ PlanState HashAggStage::getNext() {
} else if (!_seekKeysAccessors.empty()) {
// Subsequent invocation with seek keys. Return only 1 single row (if any).
_htIt = _ht->end();
- } else if (!_drainingRecordStore) {
- // Returning the results of the entire hash table first before draining the '_recordStore'.
+ } else {
++_htIt;
}
- if (_htIt == _ht->end() && !_recordStore) {
- // The hash table has been drained and nothing was spilled to disk.
+ if (_htIt == _ht->end()) {
+ // The hash table has been drained (and we never spilled to disk) so we're done.
return trackPlanState(PlanState::IS_EOF);
- } else if (_htIt != _ht->end()) {
- // Drain the '_ht' on the next 'getNext()' call.
- return trackPlanState(PlanState::ADVANCED);
- } else if (_seekKeysAccessors.empty()) {
- // A record store was created to spill to disk. Drain it then clean it up.
- if (!_rsCursor) {
- _rsCursor = _recordStore->rs()->getCursor(_opCtx);
- }
- auto nextRecord = _rsCursor->next();
- if (nextRecord) {
- // Point the out accessors to the recordStore accessors to allow parent stages to read
- // the agg state from the '_recordStore'.
- if (!_drainingRecordStore) {
- for (size_t i = 0; i < _outKeyAccessors.size(); ++i) {
- _outKeyAccessors[i]->setIndex(1);
- }
- for (size_t i = 0; i < _outAggAccessors.size(); ++i) {
- _outAggAccessors[i]->setIndex(1);
- }
- }
- _drainingRecordStore = true;
-
- // Read the agg state value from the '_recordStore' and Reconstruct the key from the
- // typeBits stored along side of the value.
- BufReader valReader(nextRecord->data.data(), nextRecord->data.size());
- auto val = value::MaterializedRow::deserializeForSorter(valReader, {});
- auto typeBits =
- KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader);
- _aggValueRecordStore = val;
-
- _aggKeyRSBuffer.reset();
- _aggKeyRecordStore = value::MaterializedRow::deserializeFromKeyString(
- decodeKeyString(nextRecord->id, typeBits), &_aggKeyRSBuffer);
- return trackPlanState(PlanState::ADVANCED);
- } else {
- _rsCursor.reset();
- _specificStats.spilledRecordEstimatedStorageSize =
- _recordStore->rs()->storageSize(_opCtx);
- _recordStore.reset();
- return trackPlanState(PlanState::IS_EOF);
- }
} else {
return trackPlanState(PlanState::ADVANCED);
}
@@ -521,13 +596,19 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co
childrenBob.append(str::stream() << slot, printer.print(expr->debugPrint()));
}
}
+
+ if (!_mergingExprs.empty()) {
+ BSONObjBuilder nestedBuilder{bob.subobjStart("mergingExprs")};
+ for (auto&& [slot, expr] : _mergingExprs) {
+ nestedBuilder.append(str::stream() << slot, printer.print(expr->debugPrint()));
+ }
+ }
+
// Spilling stats.
bob.appendBool("usedDisk", _specificStats.usedDisk);
+ bob.appendNumber("numSpills", _specificStats.numSpills);
bob.appendNumber("spilledRecords", _specificStats.spilledRecords);
- bob.appendNumber("spilledBytesApprox",
- _specificStats.lastSpilledRecordSize * _specificStats.spilledRecords);
- bob.appendNumber("spilledRecordEstimatedStorageSize",
- _specificStats.spilledRecordEstimatedStorageSize);
+ bob.appendNumber("spilledDataStorageSize", _specificStats.spilledDataStorageSize);
ret->debugInfo = bob.obj();
}
@@ -545,11 +626,12 @@ void HashAggStage::close() {
trackClose();
_ht = boost::none;
- if (_recordStore) {
- // A record store was created to spill to disk. Clean it up.
- _recordStore.reset();
- _drainingRecordStore = false;
- }
+ _rsCursor.reset();
+ _recordStore.reset();
+ _outKeyRowRecordStore = {0};
+ _outAggRowRecordStore = {0};
+ _spilledAggRow = {0};
+ _stashedNextRow = {0, 0};
if (_childOpened) {
_children[0]->close();
@@ -572,7 +654,7 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const {
ret.emplace_back(DebugPrinter::Block("[`"));
bool first = true;
- value::orderedSlotMapTraverse(_aggs, [&](auto slot, auto&& expr) {
+ for (auto&& [slot, expr] : _aggs) {
if (!first) {
ret.emplace_back(DebugPrinter::Block("`,"));
}
@@ -581,7 +663,7 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const {
ret.emplace_back("=");
DebugPrinter::addBlocks(ret, expr->debugPrint());
first = false;
- });
+ }
ret.emplace_back("`]");
if (!_seekKeysSlots.empty()) {
@@ -596,6 +678,28 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const {
ret.emplace_back("`]");
}
+ if (!_mergingExprs.empty()) {
+ ret.emplace_back("spillSlots[`");
+ for (size_t idx = 0; idx < _mergingExprs.size(); ++idx) {
+ if (idx) {
+ ret.emplace_back("`,");
+ }
+
+ DebugPrinter::addIdentifier(ret, _mergingExprs[idx].first);
+ }
+ ret.emplace_back("`]");
+
+ ret.emplace_back("mergingExprs[`");
+ for (size_t idx = 0; idx < _mergingExprs.size(); ++idx) {
+ if (idx) {
+ ret.emplace_back("`,");
+ }
+
+ DebugPrinter::addBlocks(ret, _mergingExprs[idx].second->debugPrint());
+ }
+ ret.emplace_back("`]");
+ }
+
if (!_optimizedClose) {
ret.emplace_back("reopen");
}
@@ -616,6 +720,7 @@ size_t HashAggStage::estimateCompileTimeSize() const {
size += size_estimator::estimate(_gbs);
size += size_estimator::estimate(_aggs);
size += size_estimator::estimate(_seekKeysSlots);
+ size += size_estimator::estimate(_mergingExprs);
return size;
}
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h
index d200c4b9c3d..86f7337b7f7 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.h
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.h
@@ -29,8 +29,6 @@
#pragma once
-#include <unordered_map>
-
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/stages/stages.h"
#include "mongo/db/exec/sbe/vm/vm.h"
@@ -61,22 +59,39 @@ namespace sbe {
* determining whether two group-by keys are equal. For instance, the plan may require us to do a
* case-insensitive group on a string field.
*
+ * The 'allowDiskUse' flag controls whether this stage can spill. If false and the memory budget is
+ * exhausted, this stage throws a query-fatal error with code
+ * 'QueryExceededMemoryLimitNoDiskUseAllowed'. If true, then spilling is possible and the caller
+ * must provide a vector of 'mergingExprs'. This is a vector of (slot, expression) pairs which is
+ * symmetrical with 'aggs'. The slots are only visible internally and are used to store partial
+ * aggregate values that have been recovered from the spill table. Each of the expressions is an agg
+ * function which merges the partial aggregate value from this slot into the final aggregate value.
+ * In the debug string output, the internal slots used to house the partial aggregates are printed
+ * as a list of "spillSlots" and the expressions are printed as a parallel list of "mergingExprs".
+ *
+ * If 'forcedIncreasedSpilling' is true, then this stage will spill frequently even if the memory
+ * limit is not reached. This is intended to be used in test contexts to exercise the otherwise
+ * infrequently used spilling logic.
+ *
* Debug string representation:
*
- * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? reopen?
- * collatorSlot? childStage
+ * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]?
+ * spillSlots[slot_1, ..., slot_n] mergingExprs[expr_1, ..., expr_n] reopen? collatorSlot?
+ * childStage
*/
class HashAggStage final : public PlanStage {
public:
HashAggStage(std::unique_ptr<PlanStage> input,
value::SlotVector gbs,
- value::SlotMap<std::unique_ptr<EExpression>> aggs,
+ SlotExprPairVector aggs,
value::SlotVector seekKeysSlots,
bool optimizedClose,
boost::optional<value::SlotId> collatorSlot,
bool allowDiskUse,
+ SlotExprPairVector mergingExprs,
PlanNodeId planNodeId,
- bool participateInTrialRunTracking = true);
+ bool participateInTrialRunTracking = true,
+ bool forceIncreasedSpilling = false);
std::unique_ptr<PlanStage> clone() const final;
@@ -109,99 +124,138 @@ private:
using HashKeyAccessor = value::MaterializedRowKeyAccessor<TableType::iterator>;
using HashAggAccessor = value::MaterializedRowValueAccessor<TableType::iterator>;
- void makeTemporaryRecordStore();
-
- /**
- * Spills a key and value pair to the '_recordStore' where the semantics are insert or update
- * depending on the 'update' flag. When the 'update' flag is true this method already expects
- * the 'key' to be inserted into the '_recordStore', otherwise the 'key' and 'val' pair are
- * fresh.
- *
- * This method expects the key to be seralized into a KeyString::Value so that the key is
- * memcmp-able and lookups can be done to update the 'val' in the '_recordStore'. Note that the
- * 'typeBits' are needed to reconstruct the spilled 'key' when calling 'getNext' to deserialize
- * the 'key' to a MaterializedRow. Since the '_recordStore' only stores the memcmp-able part of
- * the KeyString we need to carry the 'typeBits' separately, and we do this by appending the
- * 'typeBits' to the end of the serialized 'val' buffer and store them at the leaves of the
- * backing B-tree of the '_recordStore'. used as the RecordId.
- */
- void spillValueToDisk(const RecordId& key,
- const value::MaterializedRow& val,
- const KeyString::TypeBits& typeBits,
- bool update);
- void spillRowToDisk(const value::MaterializedRow& key,
- const value::MaterializedRow& defaultVal);
+ using SpilledRow = std::pair<value::MaterializedRow, value::MaterializedRow>;
/**
* We check amount of used memory every T processed incoming records, where T is calculated
* based on the estimated used memory and its recent growth. When the memory limit is exceeded,
- * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' and might spill some of
- * the already accumulated data into it.
+ * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' (if it hasn't already been
+ * created) and spill the contents of the hash table into this record store.
*/
struct MemoryCheckData {
+ MemoryCheckData() {
+ reset();
+ }
+
+ void reset() {
+ memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency);
+ nextMemoryCheckpoint = 0;
+ memoryCheckpointCounter = 0;
+ lastEstimatedMemoryUsage = 0;
+ }
+
const double checkpointMargin = internalQuerySBEAggMemoryUseCheckMargin.load();
- const long atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load();
- const long atLeastMemoryCheckFrequency =
+ const int64_t atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load();
+ const int64_t atLeastMemoryCheckFrequency =
internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load();
// The check frequency upper bound, which start at 'atMost' and exponentially backs off
// to 'atLeast' as more data is accumulated. If 'atLeast' is less than 'atMost', the memory
// checks will be done every 'atLeast' incoming records.
- long memoryCheckFrequency = 1;
+ int64_t memoryCheckFrequency = 1;
// The number of incoming records to process before the next memory checkpoint.
- long nextMemoryCheckpoint = 0;
+ int64_t nextMemoryCheckpoint = 0;
// The counter of the incoming records between memory checkpoints.
- long memoryCheckpointCounter = 0;
+ int64_t memoryCheckpointCounter = 0;
- long long lastEstimatedMemoryUsage = 0;
-
- MemoryCheckData() {
- memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency);
- }
+ int64_t lastEstimatedMemoryUsage = 0;
};
+
+ /**
+ * Inserts a key and value pair to the '_recordStore'. They key is serialized to a
+ * 'KeyString::Value' which becomes the 'RecordId'. This makes the keys memcmp-able and ensures
+ * that the record store ends up sorted by the group-by keys.
+ *
+ * Note that the 'typeBits' are needed to reconstruct the spilled 'key' to a 'MaterializedRow',
+ * but are not necessary for comparison purposes. Therefore, we carry the type bits separately
+ * from the record id, instead appending them to the end of the serialized 'val' buffer.
+ */
+ void spillRowToDisk(const value::MaterializedRow& key, const value::MaterializedRow& val);
+
void checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd);
+ void spill(MemoryCheckData& mcd);
+
+ /**
+ * Given a 'record' from the record store, decodes it into a pair of materialized rows (one for
+ * the group-by keys and another for the agg values).
+ *
+ * The given 'keyBuffer' is cleared, and then used to hold data (e.g. long strings and other
+ * values that can't be inlined) obtained by decoding the 'RecordId' keystring to a
+ * 'MaterializedRow'. The values in the resulting 'MaterializedRow' may be pointers into
+ * 'keyBuffer', so it is important that 'keyBuffer' outlive the row.
+ */
+ SpilledRow deserializeSpilledRecord(const Record& record, BufBuilder& keyBuffer);
+
+ PlanState getNextSpilled();
+
+ void makeTemporaryRecordStore();
const value::SlotVector _gbs;
- const value::SlotMap<std::unique_ptr<EExpression>> _aggs;
+ const SlotExprPairVector _aggs;
const boost::optional<value::SlotId> _collatorSlot;
const bool _allowDiskUse;
const value::SlotVector _seekKeysSlots;
// When this operator does not expect to be reopened (almost always) then it can close the child
// early.
const bool _optimizedClose{true};
+
+ // Expressions used to merge partial aggregates that have been spilled to disk and their
+ // corresponding input slots. For example, imagine that this list contains a pair (s12,
+ // sum(s12)). This means that the partial aggregate values will be read into slot s12 after
+ // being recovered from the spill table and can be merged using the 'sum()' agg function.
+ //
+ // When disk use is allowed, this vector must have the same length as '_aggs'.
+ const SlotExprPairVector _mergingExprs;
+
+ // When true, we spill frequently without reaching the memory limit. This allows us to exercise
+ // the spilling logic more often in test contexts.
+ const bool _forceIncreasedSpilling;
+
value::SlotAccessorMap _outAccessors;
+
+ // Accessors used to obtain the values of the group by slots when reading the input from the
+ // child.
std::vector<value::SlotAccessor*> _inKeyAccessors;
- // Accessors for the key stored in '_ht', a SwitchAccessor is used so we can produce the key
- // from either the '_ht' or the '_recordStore'.
+ // This buffer stores values for '_outKeyRowRecordStore'; values in the '_outKeyRowRecordStore'
+ // can be pointers that point to data in this buffer.
+ BufBuilder _outKeyRowRSBuffer;
+ // Accessors for the key slots provided as output by this stage. The keys can either come from
+ // the hash table or recovered from a temporary record store. We use a 'SwitchAccessor' to
+ // switch between these two cases.
std::vector<std::unique_ptr<HashKeyAccessor>> _outHashKeyAccessors;
+ // Row of key values to output used when recovering spilled data from the record store.
+ value::MaterializedRow _outKeyRowRecordStore{0};
+ std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors;
std::vector<std::unique_ptr<value::SwitchAccessor>> _outKeyAccessors;
- // Accessor for the agg state value stored in the '_recordStore' when data is spilled to disk.
- value::MaterializedRow _aggKeyRecordStore{0};
- value::MaterializedRow _aggValueRecordStore{0};
- std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors;
+ // Accessors for the output aggregate results. The aggregates can either come from the hash
+ // table or can be computed after merging partial aggregates spilled to a record store. We use a
+ // 'SwitchAccessor' to switch between these two cases.
+ std::vector<std::unique_ptr<HashAggAccessor>> _outHashAggAccessors;
+ // Row of agg values to output used when recovering spilled data from the record store.
+ value::MaterializedRow _outAggRowRecordStore{0};
std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreAggAccessors;
-
- // This buffer stores values for the spilled '_aggKeyRecordStore' that's loaded into memory from
- // the '_recordStore'. Values in the '_aggKeyRecordStore' row are pointers that point to data in
- // this buffer.
- BufBuilder _aggKeyRSBuffer;
+ std::vector<std::unique_ptr<value::SwitchAccessor>> _outAggAccessors;
std::vector<value::SlotAccessor*> _seekKeysAccessors;
value::MaterializedRow _seekKeys;
- // Accesors for the agg state in '_ht', a SwitchAccessor is used so we can produce the agg state
- // from either the '_ht' or the '_recordStore' when draining the HashAgg stage.
- std::vector<std::unique_ptr<value::SwitchAccessor>> _outAggAccessors;
- std::vector<std::unique_ptr<HashAggAccessor>> _outHashAggAccessors;
+ // Bytecode which gets executed to aggregate incoming rows into the hash table.
std::vector<std::unique_ptr<vm::CodeFragment>> _aggCodes;
+ // Bytecode for the merging expressions, executed if partial aggregates are spilled to a record
+ // store and need to be subsequently combined.
+ std::vector<std::unique_ptr<vm::CodeFragment>> _mergingExprCodes;
// Only set if collator slot provided on construction.
value::SlotAccessor* _collatorAccessor = nullptr;
+ // Function object which can be used to check whether two materialized rows of key values are
+ // equal. This comparison is collation-aware if the query has a non-simple collation.
+ value::MaterializedRowEq _keyEq;
+
boost::optional<TableType> _ht;
TableType::iterator _htIt;
@@ -213,10 +267,31 @@ private:
// Memory tracking and spilling to disk.
const long long _approxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
+
+ // A record store which is instantiated and written to in the case of spilling.
std::unique_ptr<TemporaryRecordStore> _recordStore;
- bool _drainingRecordStore{false};
std::unique_ptr<SeekableRecordCursor> _rsCursor;
+ // A monotically increasing counter used to ensure uniqueness of 'RecordId' values. When
+ // spilling, the key is encoding into the 'RecordId' of the '_recordStore'. Record ids must be
+ // unique by definition, but we might end up spilling multiple partial aggregates for the same
+ // key. We ensure uniqueness by appending a unique integer to the end of this key, which is
+ // simply ignored during deserialization.
+ int64_t _ridCounter = 0;
+
+ // Partial aggregates that have been spilled are read into '_spilledAggRow' and read using
+ // '_spilledAggsAccessors' so that they can be merged to compute the final aggregate value.
+ value::MaterializedRow _spilledAggRow{0};
+ std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _spilledAggsAccessors;
+ value::SlotAccessorMap _spilledAggsAccessorMap;
+
+ // Buffer to hold data for the deserialized key values from '_stashedNextRow'.
+ BufBuilder _stashedKeyBuffer;
+ // Place to stash the next keys and values during the streaming phase. The record store cursor
+ // doesn't offer a "peek" API, so we need to hold onto the next row between getNext() calls when
+ // the key value advances.
+ SpilledRow _stashedNextRow;
+
HashAggStats _specificStats;
// If provided, used during a trial run to accumulate certain execution stats. Once the trial
diff --git a/src/mongo/db/exec/sbe/stages/plan_stats.h b/src/mongo/db/exec/sbe/stages/plan_stats.h
index 360731a13cb..2801c8abb07 100644
--- a/src/mongo/db/exec/sbe/stages/plan_stats.h
+++ b/src/mongo/db/exec/sbe/stages/plan_stats.h
@@ -319,9 +319,13 @@ struct HashAggStats : public SpecificStats {
}
bool usedDisk{false};
+ // The number of times that the entire hash table was spilled.
+ long long numSpills{0};
+ // The number of individual records spilled to disk.
long long spilledRecords{0};
- long long lastSpilledRecordSize{0};
- long long spilledRecordEstimatedStorageSize{0};
+ // An estimate, in bytes, of the size of the final spill table after all spill events have taken
+ // place.
+ long long spilledDataStorageSize{0};
};
struct HashLookupStats : public SpecificStats {
diff --git a/src/mongo/db/exec/sbe/util/spilling.cpp b/src/mongo/db/exec/sbe/util/spilling.cpp
index c54f3bfe956..6e8c027d7fe 100644
--- a/src/mongo/db/exec/sbe/util/spilling.cpp
+++ b/src/mongo/db/exec/sbe/util/spilling.cpp
@@ -85,7 +85,6 @@ int upsertToRecordStore(OperationContext* opCtx,
BufBuilder& buf,
const KeyString::TypeBits& typeBits, // recover type of value.
bool update) {
-
// Append the 'typeBits' to the end of the val's buffer so the 'key' can be reconstructed when
// draining HashAgg.
buf.appendBuf(typeBits.getBuffer(), typeBits.getSize());
diff --git a/src/mongo/db/exec/sbe/util/spilling.h b/src/mongo/db/exec/sbe/util/spilling.h
index 95f73e2b02e..0a562d7e864 100644
--- a/src/mongo/db/exec/sbe/util/spilling.h
+++ b/src/mongo/db/exec/sbe/util/spilling.h
@@ -55,9 +55,12 @@ boost::optional<value::MaterializedRow> readFromRecordStore(OperationContext* op
RecordStore* rs,
const RecordId& rid);
-/** Inserts or updates a key/value into 'rs'. The 'update' flag controls whether or not an update
+/**
+ * Inserts or updates a key/value into 'rs'. The 'update' flag controls whether or not an update
* will be performed. If a key/value pair is inserted into the 'rs' that already exists and
* 'update' is false, this function will tassert.
+ *
+ * Returns the size of the new record in bytes, including the record id and value portions.
*/
int upsertToRecordStore(OperationContext* opCtx,
RecordStore* rs,
@@ -65,7 +68,6 @@ int upsertToRecordStore(OperationContext* opCtx,
const value::MaterializedRow& val,
const KeyString::TypeBits& typeBits,
bool update);
-
int upsertToRecordStore(OperationContext* opCtx,
RecordStore* rs,
const RecordId& key,
diff --git a/src/mongo/db/exec/sbe/values/slot.cpp b/src/mongo/db/exec/sbe/values/slot.cpp
index e9f6e08f313..32a4275ff5f 100644
--- a/src/mongo/db/exec/sbe/values/slot.cpp
+++ b/src/mongo/db/exec/sbe/values/slot.cpp
@@ -565,8 +565,10 @@ void MaterializedRow::serializeIntoKeyString(KeyString::Builder& buf) const {
}
}
-MaterializedRow MaterializedRow::deserializeFromKeyString(const KeyString::Value& keyString,
- BufBuilder* valueBufferBuilder) {
+MaterializedRow MaterializedRow::deserializeFromKeyString(
+ const KeyString::Value& keyString,
+ BufBuilder* valueBufferBuilder,
+ boost::optional<size_t> numPrefixValsToRead) {
BufReader reader(keyString.getBuffer(), keyString.getSize());
KeyString::TypeBits typeBits(keyString.getTypeBits());
KeyString::TypeBits::Reader typeBitsReader(typeBits);
@@ -578,7 +580,8 @@ MaterializedRow MaterializedRow::deserializeFromKeyString(const KeyString::Value
&reader, &typeBitsReader, false /* inverted */, typeBits.version, &valBuilder);
} while (keepReading);
- MaterializedRow result{valBuilder.numValues()};
+ size_t sizeOfRow = numPrefixValsToRead ? *numPrefixValsToRead : valBuilder.numValues();
+ MaterializedRow result{sizeOfRow};
valBuilder.readValues(result);
return result;
diff --git a/src/mongo/db/exec/sbe/values/slot.h b/src/mongo/db/exec/sbe/values/slot.h
index efd92255c9f..49020874bf6 100644
--- a/src/mongo/db/exec/sbe/values/slot.h
+++ b/src/mongo/db/exec/sbe/values/slot.h
@@ -503,10 +503,16 @@ public:
* intended for spilling key values used in the HashAgg stage. The format is not guaranteed to
* be stable between versions, so it should not be used for long-term storage or communication
* between instances.
+ *
+ * If 'numPrefixValsToRead' is provided, then only the given number of values from 'keyString'
+ * are decoded into the resulting 'MaterializedRow'. The remaining suffix values in the
+ * 'keyString' are ignored.
*/
- static MaterializedRow deserializeFromKeyString(const KeyString::Value& keyString,
+ static MaterializedRow deserializeFromKeyString(
+ const KeyString::Value& keyString,
+ BufBuilder* valueBufferBuilder,
+ boost::optional<size_t> numPrefixValsToRead = boost::none);
- BufBuilder* valueBufferBuilder);
void serializeIntoKeyString(KeyString::Builder& builder) const;
private:
@@ -597,9 +603,9 @@ private:
};
struct MaterializedRowEq {
- using ComparatorType = StringData::ComparatorInterface*;
+ using ComparatorType = StringData::ComparatorInterface;
- explicit MaterializedRowEq(const ComparatorType comparator = nullptr)
+ explicit MaterializedRowEq(const ComparatorType* comparator = nullptr)
: _comparator(comparator) {}
bool operator()(const MaterializedRow& lhs, const MaterializedRow& rhs) const {
@@ -617,7 +623,7 @@ struct MaterializedRowEq {
}
private:
- const ComparatorType _comparator = nullptr;
+ const ComparatorType* _comparator = nullptr;
};
struct MaterializedRowLess {
diff --git a/src/mongo/db/exec/sbe/values/value_builder.h b/src/mongo/db/exec/sbe/values/value_builder.h
index 9ad2b511242..a39432a4429 100644
--- a/src/mongo/db/exec/sbe/values/value_builder.h
+++ b/src/mongo/db/exec/sbe/values/value_builder.h
@@ -323,7 +323,10 @@ public:
auto bufferLen = _valueBufferBuilder->len();
size_t bufIdx = 0;
size_t rowIdx = 0;
- while (bufIdx < _numValues) {
+ // The 'row' output parameter might be smaller than the number of values owned by this
+ // builder. Be careful to only read as many values into 'row' as this output 'row' has space
+ // for.
+ while (rowIdx < row.size()) {
invariant(rowIdx < row.size());
auto [_, tagNothing, valNothing] = getValue(bufIdx++, bufferLen);
tassert(6136200, "sbe tag must be 'Boolean'", tagNothing == TypeTags::Boolean);
diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp
index 6d0bc2f38e7..3a8e48daf0a 100644
--- a/src/mongo/db/exec/sbe/vm/vm.cpp
+++ b/src/mongo/db/exec/sbe/vm/vm.cpp
@@ -4219,6 +4219,41 @@ FastTuple<bool, value::TypeTags, value::Value> ByteCode::aggSetUnionCappedImpl(
return {ownAcc, tagAcc, valAcc};
}
+FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnion(ArityType arity) {
+ auto [ownAcc, tagAcc, valAcc] = getFromStack(0);
+
+ if (tagAcc == value::TypeTags::Nothing) {
+ // Initialize the accumulator.
+ ownAcc = true;
+ std::tie(tagAcc, valAcc) = value::makeNewArraySet();
+ } else {
+ // Take ownership of the accumulator.
+ topStack(false, value::TypeTags::Nothing, 0);
+ }
+
+ tassert(7039552, "accumulator must be owned", ownAcc);
+ value::ValueGuard guardAcc{tagAcc, valAcc};
+ tassert(7039553, "accumulator must be of type ArraySet", tagAcc == value::TypeTags::ArraySet);
+ auto acc = value::getArraySetView(valAcc);
+
+ auto [tagNewSet, valNewSet] = moveOwnedFromStack(1);
+ value::ValueGuard guardNewSet{tagNewSet, valNewSet};
+ if (!value::isArray(tagNewSet)) {
+ return {false, value::TypeTags::Nothing, 0};
+ }
+
+ auto i = value::ArrayEnumerator{tagNewSet, valNewSet};
+ while (!i.atEnd()) {
+ auto [elTag, elVal] = i.getViewOfValue();
+ auto [copyTag, copyVal] = value::copyValue(elTag, elVal);
+ acc->push_back(copyTag, copyVal);
+ i.advance();
+ }
+
+ guardAcc.reset();
+ return {ownAcc, tagAcc, valAcc};
+}
+
FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnionCapped(ArityType arity) {
auto [tagNewElem, valNewElem] = moveOwnedFromStack(1);
value::ValueGuard guardNewElem{tagNewElem, valNewElem};
@@ -5354,6 +5389,8 @@ FastTuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builtin
return builtinConcatArrays(arity);
case Builtin::aggConcatArraysCapped:
return builtinAggConcatArraysCapped(arity);
+ case Builtin::aggSetUnion:
+ return builtinAggSetUnion(arity);
case Builtin::aggSetUnionCapped:
return builtinAggSetUnionCapped(arity);
case Builtin::aggCollSetUnionCapped:
@@ -5551,6 +5588,8 @@ std::string builtinToString(Builtin b) {
return "concatArrays";
case Builtin::aggConcatArraysCapped:
return "aggConcatArraysCapped";
+ case Builtin::aggSetUnion:
+ return "aggSetUnion";
case Builtin::aggSetUnionCapped:
return "aggSetUnionCapped";
case Builtin::aggCollSetUnionCapped:
diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h
index 0ed76cefad0..b6ee6d2dc75 100644
--- a/src/mongo/db/exec/sbe/vm/vm.h
+++ b/src/mongo/db/exec/sbe/vm/vm.h
@@ -669,6 +669,8 @@ enum class Builtin : uint8_t {
// specified size.
aggSetUnionCapped,
aggCollSetUnionCapped,
+ // Agg function for a simple set union (with no size cap or collation).
+ aggSetUnion,
acos,
acosh,
@@ -1289,6 +1291,7 @@ private:
FastTuple<bool, value::TypeTags, value::Value> builtinConcat(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> builtinConcatArrays(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> builtinAggConcatArraysCapped(ArityType arity);
+ FastTuple<bool, value::TypeTags, value::Value> builtinAggSetUnion(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> builtinAggSetUnionCapped(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> builtinAggCollSetUnionCapped(ArityType arity);
FastTuple<bool, value::TypeTags, value::Value> aggSetUnionCappedImpl(
diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp
index d48ee06aa50..186b877c5ec 100644
--- a/src/mongo/db/query/sbe_stage_builder.cpp
+++ b/src/mongo/db/query/sbe_stage_builder.cpp
@@ -2248,12 +2248,11 @@ std::tuple<sbe::value::SlotVector, EvalStage, std::unique_ptr<sbe::EExpression>>
return {sbe::value::SlotVector{slot}, std::move(stage), nullptr};
}
-sbe::value::SlotVector generateAccumulator(
- StageBuilderState& state,
- const AccumulationStatement& accStmt,
- const PlanStageSlots& outputs,
- sbe::value::SlotIdGenerator* slotIdGenerator,
- sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>>& accSlotToExprMap) {
+sbe::value::SlotVector generateAccumulator(StageBuilderState& state,
+ const AccumulationStatement& accStmt,
+ const PlanStageSlots& outputs,
+ sbe::value::SlotIdGenerator* slotIdGenerator,
+ sbe::SlotExprPairVector& accSlotExprPairs) {
auto rootSlot = outputs.getIfExists(PlanStageSlots::kResult);
auto argExpr = generateExpression(state, accStmt.expr.argument.get(), rootSlot, &outputs);
@@ -2268,12 +2267,48 @@ sbe::value::SlotVector generateAccumulator(
for (auto& accExpr : accExprs) {
auto slot = slotIdGenerator->generate();
aggSlots.push_back(slot);
- accSlotToExprMap.emplace(slot, std::move(accExpr));
+ accSlotExprPairs.push_back({slot, std::move(accExpr)});
}
return aggSlots;
}
+/**
+ * Generate a vector of (inputSlot, mergingExpression) pairs. The slot (whose id is allocated by
+ * this function) will be used to store spilled partial aggregate values that have been recovered
+ * from disk and deserialized. The merging expression is an agg function which combines these
+ * partial aggregates.
+ *
+ * Usually the returned vector will be of length 1, but in some cases the MQL accumulation statement
+ * is implemented by calculating multiple separate aggregates in the SBE plan, which are finalized
+ * by a subsequent project stage to produce the ultimate value.
+ */
+sbe::SlotExprPairVector generateMergingExpressions(StageBuilderState& state,
+ const AccumulationStatement& accStmt,
+ int numInputSlots) {
+ tassert(7039555, "'numInputSlots' must be positive", numInputSlots > 0);
+ auto slotIdGenerator = state.slotIdGenerator;
+ tassert(7039556, "expected non-null 'slotIdGenerator' pointer", slotIdGenerator);
+ auto frameIdGenerator = state.frameIdGenerator;
+ tassert(7039557, "expected non-null 'frameIdGenerator' pointer", frameIdGenerator);
+
+ auto spillSlots = slotIdGenerator->generateMultiple(numInputSlots);
+ auto collatorSlot = state.data->env->getSlotIfExists("collator"_sd);
+ auto mergingExprs =
+ buildCombinePartialAggregates(accStmt, spillSlots, collatorSlot, *frameIdGenerator);
+
+ // Zip the slot vector and expression vector into a vector of pairs.
+ tassert(7039550,
+ "expected same number of slots and input exprs",
+ spillSlots.size() == mergingExprs.size());
+ sbe::SlotExprPairVector result;
+ result.reserve(spillSlots.size());
+ for (size_t i = 0; i < spillSlots.size(); ++i) {
+ result.push_back({spillSlots[i], std::move(mergingExprs[i])});
+ }
+ return result;
+}
+
std::tuple<std::vector<std::string>, sbe::value::SlotVector, EvalStage> generateGroupFinalStage(
StageBuilderState& state,
EvalStage groupEvalStage,
@@ -2517,11 +2552,23 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
// Translates accumulators which are executed inside the group stage and gets slots for
// accumulators.
stage_builder::EvalStage currentStage = std::move(groupByEvalStage);
- sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> accSlotToExprMap;
+ sbe::SlotExprPairVector accSlotExprPairs;
std::vector<sbe::value::SlotVector> aggSlotsVec;
+ // Since partial accumulator state may be spilled to disk and then merged, we must construct not
+ // only the basic agg expressions for each accumulator, but also agg expressions that are used
+ // to combine partial aggregates that have been spilled to disk.
+ sbe::SlotExprPairVector mergingExprs;
for (const auto& accStmt : accStmts) {
- aggSlotsVec.emplace_back(generateAccumulator(
- _state, accStmt, childOutputs, &_slotIdGenerator, accSlotToExprMap));
+ sbe::value::SlotVector curAggSlots =
+ generateAccumulator(_state, accStmt, childOutputs, &_slotIdGenerator, accSlotExprPairs);
+
+ sbe::SlotExprPairVector curMergingExprs =
+ generateMergingExpressions(_state, accStmt, curAggSlots.size());
+
+ aggSlotsVec.emplace_back(std::move(curAggSlots));
+ mergingExprs.insert(mergingExprs.end(),
+ std::make_move_iterator(curMergingExprs.begin()),
+ std::make_move_iterator(curMergingExprs.end()));
}
// There might be duplicated expressions and slots. Dedup them before creating a HashAgg
@@ -2531,9 +2578,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
// Builds a group stage with accumulator expressions and group-by slot(s).
auto groupEvalStage = makeHashAgg(std::move(currentStage),
dedupedGroupBySlots,
- std::move(accSlotToExprMap),
+ std::move(accSlotExprPairs),
_state.data->env->getSlotIfExists("collator"_sd),
_cq.getExpCtx()->allowDiskUse,
+ std::move(mergingExprs),
nodeId);
tassert(
diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp
index 29da8e82b08..70e4693866c 100644
--- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp
@@ -485,15 +485,20 @@ EvalStage makeUnion(std::vector<EvalStage> inputStages,
EvalStage makeHashAgg(EvalStage stage,
sbe::value::SlotVector gbs,
- sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs,
+ sbe::SlotExprPairVector aggs,
boost::optional<sbe::value::SlotId> collatorSlot,
bool allowDiskUse,
+ sbe::SlotExprPairVector mergingExprs,
PlanNodeId planNodeId) {
stage.setOutSlots(gbs);
for (auto& [slot, _] : aggs) {
stage.addOutSlot(slot);
}
+ // In debug builds, we artificially force frequent spilling. This makes sure that our tests
+ // exercise the spilling algorithm and the associated logic for merging partial aggregates which
+ // otherwise would require large data sizes to exercise.
+ const bool forceIncreasedSpilling = kDebugBuild && allowDiskUse;
stage.setStage(sbe::makeS<sbe::HashAggStage>(stage.extractStage(planNodeId),
std::move(gbs),
std::move(aggs),
@@ -501,7 +506,10 @@ EvalStage makeHashAgg(EvalStage stage,
true /* optimized close */,
collatorSlot,
allowDiskUse,
- planNodeId));
+ std::move(mergingExprs),
+ planNodeId,
+ true /* participateInTrialRunTracking */,
+ forceIncreasedSpilling));
return stage;
}
diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h
index 06e67456b11..fdf0ea39a02 100644
--- a/src/mongo/db/query/sbe_stage_builder_helpers.h
+++ b/src/mongo/db/query/sbe_stage_builder_helpers.h
@@ -37,6 +37,7 @@
#include "mongo/db/exec/sbe/expressions/expression.h"
#include "mongo/db/exec/sbe/match_path.h"
#include "mongo/db/exec/sbe/stages/filter.h"
+#include "mongo/db/exec/sbe/stages/hash_agg.h"
#include "mongo/db/exec/sbe/stages/makeobj.h"
#include "mongo/db/exec/sbe/stages/project.h"
#include "mongo/db/pipeline/expression.h"
@@ -424,9 +425,10 @@ EvalStage makeUnion(std::vector<EvalStage> inputStages,
EvalStage makeHashAgg(EvalStage stage,
sbe::value::SlotVector gbs,
- sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs,
+ sbe::SlotExprPairVector aggs,
boost::optional<sbe::value::SlotId> collatorSlot,
bool allowDiskUse,
+ sbe::SlotExprPairVector mergingExprs,
PlanNodeId planNodeId);
EvalStage makeMkBsonObj(EvalStage stage,
diff --git a/src/mongo/db/query/sbe_stage_builder_lookup.cpp b/src/mongo/db/query/sbe_stage_builder_lookup.cpp
index 4a4a5d396b6..2c57fe6374f 100644
--- a/src/mongo/db/query/sbe_stage_builder_lookup.cpp
+++ b/src/mongo/db/query/sbe_stage_builder_lookup.cpp
@@ -364,12 +364,15 @@ std::pair<SlotId /* keyValuesSetSlot */, std::unique_ptr<sbe::PlanStage>> buildK
// Re-pack the individual key values into a set. We don't cap "addToSet" here because its size
// is bounded by the size of the record.
SlotId keyValuesSetSlot = slotIdGenerator.generate();
+ SlotId spillSlot = slotIdGenerator.generate();
EvalStage packedKeyValuesStage = makeHashAgg(
EvalStage{std::move(keyValuesStage), SlotVector{}},
makeSV(), /* groupBy slots - "none" means creating a single group */
- makeEM(keyValuesSetSlot, makeFunction("addToSet"_sd, makeVariable(keyValueSlot))),
+ makeSlotExprPairVec(keyValuesSetSlot,
+ makeFunction("addToSet"_sd, makeVariable(keyValueSlot))),
boost::none /* we group _all_ key values into a single set, so collator is irrelevant */,
allowDiskUse,
+ makeSlotExprPairVec(spillSlot, makeFunction("aggSetUnion"_sd, makeVariable(spillSlot))),
nodeId);
// The set in 'keyValuesSetSlot' might end up empty if the localField contained only missing and
@@ -411,15 +414,20 @@ std::pair<SlotId /* resultSlot */, std::unique_ptr<sbe::PlanStage>> buildForeign
// are no matches, return an empty array.
const int sizeCap = internalLookupStageIntermediateDocumentMaxSizeBytes.load();
SlotId accumulatorSlot = slotIdGenerator.generate();
+ SlotId spillSlot = slotIdGenerator.generate();
innerBranch = makeHashAgg(
std::move(innerBranch),
makeSV(), /* groupBy slots */
- makeEM(accumulatorSlot,
- makeFunction("addToArrayCapped"_sd,
- makeVariable(foreignRecordSlot),
- makeConstant(TypeTags::NumberInt32, sizeCap))),
+ makeSlotExprPairVec(accumulatorSlot,
+ makeFunction("addToArrayCapped"_sd,
+ makeVariable(foreignRecordSlot),
+ makeConstant(TypeTags::NumberInt32, sizeCap))),
{} /* collatorSlot, no collation here because we want to return all matches "as is" */,
allowDiskUse,
+ makeSlotExprPairVec(spillSlot,
+ makeFunction("aggConcatArraysCapped",
+ makeVariable(spillSlot),
+ makeConstant(TypeTags::NumberInt32, sizeCap))),
nodeId);
// 'accumulatorSlot' is either Nothing or contains an array of size two, where the front element