diff options
author | David Storch <david.storch@mongodb.com> | 2023-01-30 22:54:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-31 01:27:54 +0000 |
commit | 355aadd8ff9d5599da2983990f87c7ec300a972d (patch) | |
tree | b75d91aea6c48c0e47edcff1c3a9ededde8c3354 /src/mongo | |
parent | 40c93f028e36f78c06756f4bfd358d240bdd9b34 (diff) | |
download | mongo-355aadd8ff9d5599da2983990f87c7ec300a972d.tar.gz |
SERVER-70395 Change spilling for SBE HashAggStage to use a more efficient algorithm
The new algorithm spills the entire hash table to a
RecordStore whenever the memory budget is exceeded. Once all
the input is consumed, it switches to a streaming approach,
merging the partial aggregates recovered from disk.
Diffstat (limited to 'src/mongo')
22 files changed, 847 insertions, 397 deletions
diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp index 17b13b26db1..72206e75512 100644 --- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp +++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp @@ -683,14 +683,15 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const GroupByNode& n, tassert(6624227, "refs expected", refsAgg); auto& exprs = refsAgg->nodes(); - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs; + sbe::SlotExprPairVector aggs; + aggs.reserve(exprs.size()); for (size_t idx = 0; idx < exprs.size(); ++idx) { auto expr = lowerExpression(exprs[idx]); auto slot = _slotIdGenerator.generate(); mapProjToSlot(names[idx], slot); - aggs.emplace(slot, std::move(expr)); + aggs.push_back({slot, std::move(expr)}); } boost::optional<sbe::value::SlotId> collatorSlot = _namedSlots.getSlotIfExists("collator"_sd); @@ -705,6 +706,11 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const GroupByNode& n, true /*optimizedClose*/, collatorSlot, false /*allowDiskUse*/, + // Since we are always disallowing disk use for this stage, + // we need not provide merging expressions. Once spilling + // is permitted here, we will need to generate merging + // expressions during lowering. + sbe::makeSlotExprPairVec() /*mergingExprs*/, planNodeId); } diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index 77f69e8cbca..a96a559cd2d 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -701,6 +701,7 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collSetDifference, false}}, {"collSetEquals", BuiltinFn{[](size_t n) { return n >= 3; }, vm::Builtin::collSetEquals, false}}, + {"aggSetUnion", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggSetUnion, true}}, {"aggSetUnionCapped", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggSetUnionCapped, true}}, {"aggCollSetUnionCapped", diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h index ee0f997445f..433aa995146 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.h +++ b/src/mongo/db/exec/sbe/expressions/expression.h @@ -127,6 +127,8 @@ protected: } }; +using SlotExprPairVector = std::vector<std::pair<value::SlotId, std::unique_ptr<EExpression>>>; + template <typename T, typename... Args> inline std::unique_ptr<EExpression> makeE(Args&&... args) { return std::make_unique<T>(std::forward<Args>(args)...); @@ -143,20 +145,30 @@ inline auto makeEs(Ts&&... pack) { namespace detail { // base case -inline void makeEM_unwind(value::SlotMap<std::unique_ptr<EExpression>>& result, - value::SlotId slot, - std::unique_ptr<EExpression> expr) { - result.emplace(slot, std::move(expr)); +template <typename R> +inline void makeSlotExprPairHelper(R& result, + value::SlotId slot, + std::unique_ptr<EExpression> expr) { + if constexpr (std::is_same_v<R, value::SlotMap<std::unique_ptr<EExpression>>>) { + result.emplace(slot, std::move(expr)); + } else { + result.push_back({slot, std::move(expr)}); + } } // recursive case -template <typename... Ts> -inline void makeEM_unwind(value::SlotMap<std::unique_ptr<EExpression>>& result, - value::SlotId slot, - std::unique_ptr<EExpression> expr, - Ts&&... rest) { - result.emplace(slot, std::move(expr)); - makeEM_unwind(result, std::forward<Ts>(rest)...); +template <typename R, typename... Ts> +inline void makeSlotExprPairHelper(R& result, + value::SlotId slot, + std::unique_ptr<EExpression> expr, + Ts&&... rest) { + if constexpr (std::is_same_v<R, value::SlotMap<std::unique_ptr<EExpression>>>) { + result.emplace(slot, std::move(expr)); + } else { + static_assert(std::is_same_v<R, SlotExprPairVector>); + result.push_back({slot, std::move(expr)}); + } + makeSlotExprPairHelper(result, std::forward<Ts>(rest)...); } } // namespace detail @@ -165,7 +177,7 @@ auto makeEM(Ts&&... pack) { value::SlotMap<std::unique_ptr<EExpression>> result; if constexpr (sizeof...(pack) > 0) { result.reserve(sizeof...(Ts) / 2); - detail::makeEM_unwind(result, std::forward<Ts>(pack)...); + detail::makeSlotExprPairHelper(result, std::forward<Ts>(pack)...); } return result; } @@ -178,6 +190,16 @@ auto makeSV(Args&&... args) { return v; } +template <typename... Ts> +auto makeSlotExprPairVec(Ts&&... pack) { + SlotExprPairVector v; + if constexpr (sizeof...(pack) > 0) { + v.reserve(sizeof...(Ts) / 2); + detail::makeSlotExprPairHelper(v, std::forward<Ts>(pack)...); + } + return v; +} + /** * This is a constant expression. It assumes the ownership of the input constant. */ diff --git a/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp b/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp index 74cd7dce343..60380c5438e 100644 --- a/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp +++ b/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp @@ -102,6 +102,35 @@ TEST_F(SBEBuiltinSetOpTest, ReturnsNothingSetUnion) { runAndAssertNothing(compiledExpr.get()); } +TEST_F(SBEBuiltinSetOpTest, AggSetUnion) { + value::OwnedValueAccessor aggAccessor, inputAccessor; + auto inputSlot = bindAccessor(&inputAccessor); + auto setUnionExpr = + stage_builder::makeFunction("aggSetUnion", stage_builder::makeVariable(inputSlot)); + auto compiledExpr = compileAggExpression(*setUnionExpr, &aggAccessor); + + auto [arrTag1, arrVal1] = makeArray(BSON_ARRAY(1 << 2)); + inputAccessor.reset(arrTag1, arrVal1); + auto [resTag1, resVal1] = makeArraySet(BSON_ARRAY(1 << 2)); + runAndAssertExpression(compiledExpr.get(), {resTag1, resVal1}); + aggAccessor.reset(resTag1, resVal1); + + auto [arrTag2, arrVal2] = makeArraySet(BSON_ARRAY(1 << 3 << 2 << 6)); + inputAccessor.reset(arrTag2, arrVal2); + auto [resTag2, resVal2] = makeArraySet(BSON_ARRAY(1 << 2 << 3 << 6)); + runAndAssertExpression(compiledExpr.get(), {resTag2, resVal2}); + aggAccessor.reset(resTag2, resVal2); + + auto [arrTag3, arrVal3] = makeArray(BSONArray{}); + inputAccessor.reset(arrTag3, arrVal3); + auto [resTag3, resVal3] = makeArraySet(BSON_ARRAY(1 << 2 << 3 << 6)); + runAndAssertExpression(compiledExpr.get(), {resTag3, resVal3}); + aggAccessor.reset(resTag3, resVal3); + + inputAccessor.reset(value::TypeTags::Nothing, 0); + runAndAssertNothing(compiledExpr.get()); +} + TEST_F(SBEBuiltinSetOpTest, ComputesSetIntersection) { value::OwnedValueAccessor slotAccessor1, slotAccessor2; auto arrSlot1 = bindAccessor(&slotAccessor1); diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp index 603d71f08ea..b48ed496798 100644 --- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp @@ -82,18 +82,23 @@ void HashAggStageTest::performHashAggWithSpillChecking( auto makeStageFn = [this, collatorSlot, shouldUseCollator, shouldSpill]( value::SlotId scanSlot, std::unique_ptr<PlanStage> scanStage) { auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), makeSV(), true, boost::optional<value::SlotId>{shouldUseCollator, collatorSlot}, shouldSpill, + makeSlotExprPairVec( + spillSlot, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); @@ -179,20 +184,21 @@ TEST_F(HashAggStageTest, HashAggMinMaxTest) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(), - makeEM(minSlot, - stage_builder::makeFunction("min", makeE<EVariable>(scanSlot)), - maxSlot, - stage_builder::makeFunction("max", makeE<EVariable>(scanSlot)), - collMinSlot, - stage_builder::makeFunction( - "collMin", collExpr->clone(), makeE<EVariable>(scanSlot)), - collMaxSlot, - stage_builder::makeFunction( - "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))), + makeSlotExprPairVec(minSlot, + stage_builder::makeFunction("min", makeE<EVariable>(scanSlot)), + maxSlot, + stage_builder::makeFunction("max", makeE<EVariable>(scanSlot)), + collMinSlot, + stage_builder::makeFunction( + "collMin", collExpr->clone(), makeE<EVariable>(scanSlot)), + collMaxSlot, + stage_builder::makeFunction( + "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))), makeSV(), true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec() /* mergingExprs */, kEmptyPlanNodeId); auto outSlot = generateSlotId(); @@ -243,13 +249,15 @@ TEST_F(HashAggStageTest, HashAggAddToSetTest) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(), - makeEM(hashAggSlot, - stage_builder::makeFunction( - "collAddToSet", std::move(collExpr), makeE<EVariable>(scanSlot))), + makeSlotExprPairVec(hashAggSlot, + stage_builder::makeFunction("collAddToSet", + std::move(collExpr), + makeE<EVariable>(scanSlot))), makeSV(), true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec() /* mergingExprs */, kEmptyPlanNodeId); return std::make_pair(hashAggSlot, std::move(hashAggStage)); @@ -340,14 +348,16 @@ TEST_F(HashAggStageTest, HashAggSeekKeysTest) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), makeSV(seekSlot), true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec() /* mergingExprs */, kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); @@ -400,17 +410,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -433,6 +447,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->numSpills); ASSERT_EQ(0, stats->spilledRecords); stage->close(); @@ -441,7 +456,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to // 64B so that exactly one row fits in memory. - const int expectedRowsToFitInMemory = 1; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); @@ -459,17 +473,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -492,7 +510,14 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + // Memory usage is estimated only every two rows at the most frequent. Also, we only start + // spilling after estimating that the memory budget is exceeded. These two factors result in + // fewer expected spills than there are input records, even though only one record fits in + // memory at a time. + ASSERT_EQ(stats->numSpills, 3); + // The input has one run of two consecutive values, so we expect to spill as many records as + // there are input values minus one. + ASSERT_EQ(stats->spilledRecords, 8); stage->close(); } @@ -526,17 +551,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -559,6 +588,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { // Check that it did not spill. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->numSpills); ASSERT_EQ(0, stats->spilledRecords); stage->close(); @@ -567,7 +597,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // We estimate the size of result row like {double, int64} at 50B. Set the memory threshold to // 64B so that exactly one row fits in memory. - const int expectedRowsToFitInMemory = 1; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); @@ -585,17 +614,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -618,7 +651,14 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + // Memory usage is estimated only every two rows at the most frequent. Also, we only start + // spilling after estimating that the memory budget is exceeded. These two factors result in + // fewer expected spills than there are input records, even though only one record fits in + // memory at a time. + ASSERT_EQ(stats->numSpills, 3); + // The input has one run of two consecutive values, so we expect to spill as many records as + // there are input values minus one. + ASSERT_EQ(stats->spilledRecords, 8); stage->close(); } @@ -640,17 +680,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) { // Build a HashAggStage, with an empty group by slot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -671,6 +715,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->numSpills); ASSERT_EQ(0, stats->spilledRecords); stage->close(); @@ -679,7 +724,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) { TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // We estimate the size of result row like {double, int64} at 59B. Set the memory threshold to // 128B so that two rows fit in memory. - const int expectedRowsToFitInMemory = 2; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128); @@ -698,19 +742,27 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); auto sumsSlot = generateSlotId(); + auto spillSlot1 = generateSlotId(); + auto spillSlot2 = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), - sumsSlot, - stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), + sumsSlot, + stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot1, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot1)), + spillSlot2, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot2))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -740,7 +792,10 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + ASSERT_EQ(stats->numSpills, 3); + // The input has one run of two consecutive values, so we expect to spill as many records as + // there are input values minus one. + ASSERT_EQ(stats->spilledRecords, 8); stage->close(); } @@ -765,19 +820,27 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); auto sumsSlot = generateSlotId(); + auto spillSlot1 = generateSlotId(); + auto spillSlot2 = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), - sumsSlot, - stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), + sumsSlot, + stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), makeSV(), // Seek slot true, boost::none, true, // allowDiskUse=true + makeSlotExprPairVec( + spillSlot1, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot1)), + spillSlot2, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot2))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -807,7 +870,9 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size(), stats->spilledRecords); + // We expect each incoming value to result in a spill of a single record. + ASSERT_EQ(stats->numSpills, 9); + ASSERT_EQ(stats->spilledRecords, 9); stage->close(); } @@ -844,14 +909,18 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) { // Build a HashAggStage, group by the scanSlot and compute a sum for each group. auto sumsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(sumsSlot, stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), + makeSlotExprPairVec(sumsSlot, + stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), makeSV(), // Seek slot true, boost::none, true, // allowDiskUse=true + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -885,17 +954,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true, // allowDiskUse=true + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. diff --git a/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp b/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp index 6446ab57845..ecd22c9df81 100644 --- a/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp @@ -129,11 +129,12 @@ TEST_F(PlanSizeTest, Filter) { TEST_F(PlanSizeTest, HashAgg) { auto stage = makeS<HashAggStage>(mockS(), mockSV(), - makeEM(generateSlotId(), mockE()), + makeSlotExprPairVec(generateSlotId(), mockE()), makeSV(), true, generateSlotId(), false, + makeSlotExprPairVec(), kEmptyPlanNodeId); assertPlanSize(*stage); } diff --git a/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp b/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp index 06097b3a3b2..40f9ca9a9e6 100644 --- a/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp @@ -141,14 +141,16 @@ TEST_F(TrialRunTrackerTest, TrialEndsDuringOpenPhaseOfBlockingStage) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); auto tracker = std::make_unique<TrialRunTracker>(numResultsLimit, size_t{0}); @@ -210,14 +212,16 @@ TEST_F(TrialRunTrackerTest, OnlyDeepestNestedBlockingStageHasTrialRunTracker) { auto hashAggStage = makeS<HashAggStage>( std::move(unionStage), makeSV(unionSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); hashAggStage->prepare(*ctx); @@ -277,14 +281,16 @@ TEST_F(TrialRunTrackerTest, SiblingBlockingStagesBothGetTrialRunTracker) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); @@ -407,14 +413,16 @@ TEST_F(TrialRunTrackerTest, DisablingTrackingForAChildStagePreventsEarlyExit) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); diff --git a/src/mongo/db/exec/sbe/size_estimator.h b/src/mongo/db/exec/sbe/size_estimator.h index fb6684eea52..bbb92328331 100644 --- a/src/mongo/db/exec/sbe/size_estimator.h +++ b/src/mongo/db/exec/sbe/size_estimator.h @@ -92,6 +92,11 @@ inline size_t estimate(const S& stats) { return stats.estimateObjectSizeInBytes() - sizeof(S); } +template <typename A, typename B> +inline size_t estimate(const std::pair<A, B>& pair) { + return estimate(pair.first) + estimate(pair.second); +} + // Calculate the size of the inlined vector's elements. template <typename T, size_t N, typename A> size_t estimate(const absl::InlinedVector<T, N, A>& vector) { diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index 74b7625ede6..67c1b46d8cb 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -43,32 +43,54 @@ namespace mongo { namespace sbe { HashAggStage::HashAggStage(std::unique_ptr<PlanStage> input, value::SlotVector gbs, - value::SlotMap<std::unique_ptr<EExpression>> aggs, + SlotExprPairVector aggs, value::SlotVector seekKeysSlots, bool optimizedClose, boost::optional<value::SlotId> collatorSlot, bool allowDiskUse, + SlotExprPairVector mergingExprs, PlanNodeId planNodeId, - bool participateInTrialRunTracking) + bool participateInTrialRunTracking, + bool forceIncreasedSpilling) : PlanStage("group"_sd, planNodeId, participateInTrialRunTracking), _gbs(std::move(gbs)), _aggs(std::move(aggs)), _collatorSlot(collatorSlot), _allowDiskUse(allowDiskUse), _seekKeysSlots(std::move(seekKeysSlots)), - _optimizedClose(optimizedClose) { + _optimizedClose(optimizedClose), + _mergingExprs(std::move(mergingExprs)), + _forceIncreasedSpilling(forceIncreasedSpilling) { _children.emplace_back(std::move(input)); invariant(_seekKeysSlots.empty() || _seekKeysSlots.size() == _gbs.size()); tassert(5843100, "HashAgg stage was given optimizedClose=false and seek keys", _seekKeysSlots.empty() || _optimizedClose); + + if (_allowDiskUse) { + tassert(7039549, + "disk use enabled for HashAggStage but incorrect number of merging expresssions", + _aggs.size() == _mergingExprs.size()); + } + + if (_forceIncreasedSpilling) { + tassert(7039554, "'forceIncreasedSpilling' set but disk use not allowed", _allowDiskUse); + } } std::unique_ptr<PlanStage> HashAggStage::clone() const { - value::SlotMap<std::unique_ptr<EExpression>> aggs; + SlotExprPairVector aggs; + aggs.reserve(_aggs.size()); for (auto& [k, v] : _aggs) { - aggs.emplace(k, v->clone()); + aggs.push_back({k, v->clone()}); } + + SlotExprPairVector mergingExprsClone; + mergingExprsClone.reserve(_mergingExprs.size()); + for (auto&& [k, v] : _mergingExprs) { + mergingExprsClone.push_back({k, v->clone()}); + } + return std::make_unique<HashAggStage>(_children[0]->clone(), _gbs, std::move(aggs), @@ -76,8 +98,10 @@ std::unique_ptr<PlanStage> HashAggStage::clone() const { _optimizedClose, _collatorSlot, _allowDiskUse, + std::move(mergingExprsClone), _commonStats.nodeId, - _participateInTrialRunTracking); + _participateInTrialRunTracking, + _forceIncreasedSpilling); } void HashAggStage::doSaveState(bool relinquishCursor) { @@ -122,34 +146,36 @@ void HashAggStage::prepare(CompileCtx& ctx) { } value::SlotSet dupCheck; + auto throwIfDupSlot = [&dupCheck](value::SlotId slot) { + auto [_, inserted] = dupCheck.emplace(slot); + tassert(7039551, "duplicate slot id", inserted); + }; + size_t counter = 0; // Process group by columns. for (auto& slot : _gbs) { - auto [it, inserted] = dupCheck.emplace(slot); - uassert(4822827, str::stream() << "duplicate field: " << slot, inserted); + throwIfDupSlot(slot); _inKeyAccessors.emplace_back(_children[0]->getAccessor(ctx, slot)); - // Construct accessors for the key to be processed from either the '_ht' or the - // '_recordStore'. Before the memory limit is reached the '_outHashKeyAccessors' will carry - // the group-by keys, otherwise the '_outRecordStoreKeyAccessors' will carry the group-by - // keys. + // Construct accessors for obtaining the key values from either the hash table '_ht' or the + // '_recordStore'. _outHashKeyAccessors.emplace_back(std::make_unique<HashKeyAccessor>(_htIt, counter)); _outRecordStoreKeyAccessors.emplace_back( - std::make_unique<value::MaterializedSingleRowAccessor>(_aggKeyRecordStore, counter)); + std::make_unique<value::MaterializedSingleRowAccessor>(_outKeyRowRecordStore, counter)); - counter++; - - // A SwitchAccessor is used to point the '_outKeyAccessors' to the key coming from the '_ht' - // or the '_recordStore' when draining the HashAgg stage in getNext. The group-by key will - // either be in the '_ht' or the '_recordStore' if the key lives in memory, or if the key - // has been spilled to disk, respectively. The SwitchAccessor allows toggling between the - // two so the parent stage can read it through the '_outAccessors'. + // A 'SwitchAccessor' is used to point the '_outKeyAccessors' to the key coming from the + // '_ht' or the '_recordStore' when draining the HashAgg stage in getNext(). If no spilling + // occurred, the keys will be obtained from the hash table. If spilling kicked in, then all + // of the data is written out to the record store, so the 'SwitchAccessor' is reconfigured + // to obtain all of the keys from the spill table. _outKeyAccessors.emplace_back( std::make_unique<value::SwitchAccessor>(std::vector<value::SlotAccessor*>{ _outHashKeyAccessors.back().get(), _outRecordStoreKeyAccessors.back().get()})); _outAccessors[slot] = _outKeyAccessors.back().get(); + + ++counter; } // Process seek keys (if any). The keys must come from outside of the subtree (by definition) so @@ -160,26 +186,18 @@ void HashAggStage::prepare(CompileCtx& ctx) { counter = 0; for (auto& [slot, expr] : _aggs) { - auto [it, inserted] = dupCheck.emplace(slot); - // Some compilers do not allow to capture local bindings by lambda functions (the one - // is used implicitly in uassert below), so we need a local variable to construct an - // error message. - const auto slotId = slot; - uassert(4822828, str::stream() << "duplicate field: " << slotId, inserted); - - // Construct accessors for the agg state to be processed from either the '_ht' or the - // '_recordStore' by the SwitchAccessor owned by '_outAggAccessors' below. + throwIfDupSlot(slot); + + // Just like with the output accessors for the keys, we construct output accessors for the + // aggregate values that read from either the hash table '_ht' or the '_recordStore'. _outRecordStoreAggAccessors.emplace_back( - std::make_unique<value::MaterializedSingleRowAccessor>(_aggValueRecordStore, counter)); + std::make_unique<value::MaterializedSingleRowAccessor>(_outAggRowRecordStore, counter)); _outHashAggAccessors.emplace_back(std::make_unique<HashAggAccessor>(_htIt, counter)); - counter++; - - // A SwitchAccessor is used to toggle the '_outAggAccessors' between the '_ht' and the - // '_recordStore' when updating the agg state via the bytecode. By compiling the agg - // EExpressions with a SwitchAccessor we can load the agg value into the memory of - // '_aggValueRecordStore' if the value comes from the '_recordStore' or we can use the - // agg value referenced through '_htIt' and run the bytecode to mutate the value through the - // SwitchAccessor. + + // A 'SwitchAccessor' is used to toggle the '_outAggAccessors' between the '_ht' and the + // '_recordStore'. Just like the key values, the aggregate values are always obtained from + // the hash table if no spilling occurred and are always obtained from the record store if + // spilling occurred. _outAggAccessors.emplace_back( std::make_unique<value::SwitchAccessor>(std::vector<value::SlotAccessor*>{ _outHashAggAccessors.back().get(), _outRecordStoreAggAccessors.back().get()})); @@ -189,10 +207,32 @@ void HashAggStage::prepare(CompileCtx& ctx) { ctx.root = this; ctx.aggExpression = true; ctx.accumulator = _outAggAccessors.back().get(); - _aggCodes.emplace_back(expr->compile(ctx)); ctx.aggExpression = false; + + ++counter; + } + + // If disk use is allowed, then we need to compile the merging expressions as well. + if (_allowDiskUse) { + counter = 0; + for (auto&& [spillSlot, mergingExpr] : _mergingExprs) { + throwIfDupSlot(spillSlot); + + _spilledAggsAccessors.push_back( + std::make_unique<value::MaterializedSingleRowAccessor>(_spilledAggRow, counter)); + _spilledAggsAccessorMap[spillSlot] = _spilledAggsAccessors.back().get(); + + ctx.root = this; + ctx.aggExpression = true; + ctx.accumulator = _outAggAccessors[counter].get(); + _mergingExprCodes.emplace_back(mergingExpr->compile(ctx)); + ctx.aggExpression = false; + + ++counter; + } } + _compiled = true; } @@ -202,6 +242,15 @@ value::SlotAccessor* HashAggStage::getAccessor(CompileCtx& ctx, value::SlotId sl return it->second; } } else { + // The slots into which we read spilled partial aggregates, accessible via + // '_spilledAggsAccessors', should only be visible to this stage. They are used internally + // when merging spilled partial aggregates and should never be read by ancestor stages. + // Therefore, they are only made visible when this stage is in the process of compiling + // itself. + if (auto it = _spilledAggsAccessorMap.find(slot); it != _spilledAggsAccessorMap.end()) { + return it->second; + } + return _children[0]->getAccessor(ctx, slot); } @@ -227,89 +276,82 @@ void HashAggStage::spillRowToDisk(const value::MaterializedRow& key, const value::MaterializedRow& val) { KeyString::Builder kb{KeyString::Version::kLatestVersion}; key.serializeIntoKeyString(kb); + // Add a unique integer to the end of the key, since record ids must be unique. We want equal + // keys to be adjacent in the 'RecordStore' so that we can merge the partial aggregates with a + // single pass. + kb.appendNumberLong(_ridCounter++); auto typeBits = kb.getTypeBits(); - auto rid = RecordId(kb.getBuffer(), kb.getSize()); - boost::optional<value::MaterializedRow> valFromRs = - readFromRecordStore(_opCtx, _recordStore->rs(), rid); - tassert(6031100, "Spilling a row doesn't support updating it in the store.", !valFromRs); - spillValueToDisk(rid, val, typeBits, false /*update*/); + upsertToRecordStore(_opCtx, _recordStore->rs(), rid, val, typeBits, false /*update*/); + _specificStats.spilledRecords++; } -void HashAggStage::spillValueToDisk(const RecordId& key, - const value::MaterializedRow& val, - const KeyString::TypeBits& typeBits, - bool update) { - auto nBytes = upsertToRecordStore(_opCtx, _recordStore->rs(), key, val, typeBits, update); - if (!update) { - _specificStats.spilledRecords++; +void HashAggStage::spill(MemoryCheckData& mcd) { + uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + "Exceeded memory limit for $group, but didn't allow external spilling;" + " pass allowDiskUse:true to opt in", + _allowDiskUse); + + // Since we flush the entire hash table to disk, we also clear any state related to estimating + // memory consumption. + mcd.reset(); + + if (!_recordStore) { + makeTemporaryRecordStore(); } - _specificStats.lastSpilledRecordSize = nBytes; + + for (auto&& it : *_ht) { + spillRowToDisk(it.first, it.second); + } + _ht->clear(); + + ++_specificStats.numSpills; } // Checks memory usage. Ideally, we'd want to know the exact size of already accumulated data, but // we cannot, so we estimate it based on the last updated/inserted row, if we have one, or the first // row in the '_ht' table. If the estimated memory usage exceeds the allowed, this method initiates -// spilling (if haven't been done yet) and evicts some records from the '_ht' table into the temp -// store to keep the memory usage under the limit. +// spilling. void HashAggStage::checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd) { - // The '_ht' table might become empty in the degenerate case when all rows had to be evicted to - // meet the memory constraint during a previous check -- we don't need to keep checking memory - // usage in this case because the table will never get new rows. - if (_ht->empty()) { - return; - } + invariant(!_ht->empty()); // If the group-by key is empty we will only ever aggregate into a single row so no sense in - // spilling since we will just be moving a single row back and forth from disk to main memory. + // spilling. if (_inKeyAccessors.size() == 0) { return; } mcd.memoryCheckpointCounter++; - if (mcd.memoryCheckpointCounter >= mcd.nextMemoryCheckpoint) { - if (_htIt == _ht->end()) { - _htIt = _ht->begin(); - } - const long estimatedRowSize = - _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter(); - long long estimatedTotalSize = _ht->size() * estimatedRowSize; + if (mcd.memoryCheckpointCounter < mcd.nextMemoryCheckpoint) { + // We haven't reached the next checkpoint at which we estimate memory usage and decide if we + // should spill. + return; + } + + const long estimatedRowSize = + _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter(); + long long estimatedTotalSize = _ht->size() * estimatedRowSize; + + if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) { + spill(mcd); + } else { + // Calculate the next memory checkpoint. We estimate it based on the prior growth of the + // '_ht' and the remaining available memory. If 'estimatedGainPerChildAdvance' suggests that + // the hash table is growing, then the checkpoint is estimated as some configurable + // percentage of the number of additional input rows that we would have to process to + // consume the remaining memory. On the other hand, a value of 'estimtedGainPerChildAdvance' + // close to zero indicates a stable hash stable size, in which case we can delay the next + // check progressively. const double estimatedGainPerChildAdvance = (static_cast<double>(estimatedTotalSize - mcd.lastEstimatedMemoryUsage) / mcd.memoryCheckpointCounter); - if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) { - uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - "Exceeded memory limit for $group, but didn't allow external spilling." - " Pass allowDiskUse:true to opt in.", - _allowDiskUse); - if (!_recordStore) { - makeTemporaryRecordStore(); - } - - // Evict enough rows into the temporary store to drop below the memory constraint. - const long rowsToEvictCount = - 1 + (estimatedTotalSize - _approxMemoryUseInBytesBeforeSpill) / estimatedRowSize; - for (long i = 0; !_ht->empty() && i < rowsToEvictCount; i++) { - spillRowToDisk(_htIt->first, _htIt->second); - _ht->erase(_htIt); - _htIt = _ht->begin(); - } - estimatedTotalSize = _ht->size() * estimatedRowSize; - } - - // Calculate the next memory checkpoint. We estimate it based on the prior growth of the - // '_ht' and the remaining available memory. We have to keep doing this even after starting - // to spill because some accumulators can grow in size inside '_ht' (with no bounds). - // Value of 'estimatedGainPerChildAdvance' can be negative if the previous checkpoint - // evicted any records. And a value close to zero indicates a stable size of '_ht' so can - // delay the next check progressively. const long nextCheckpointCandidate = (estimatedGainPerChildAdvance > 0.1) ? mcd.checkpointMargin * (_approxMemoryUseInBytesBeforeSpill - estimatedTotalSize) / estimatedGainPerChildAdvance - : (estimatedGainPerChildAdvance < -0.1) ? mcd.atMostCheckFrequency - : mcd.nextMemoryCheckpoint * 2; + : mcd.nextMemoryCheckpoint * 2; + mcd.nextMemoryCheckpoint = std::min<long>(mcd.memoryCheckFrequency, std::max<long>(mcd.atMostCheckFrequency, nextCheckpointCandidate)); @@ -335,16 +377,28 @@ void HashAggStage::open(bool reOpen) { 5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator); auto collatorView = value::getCollatorView(collatorVal); const value::MaterializedRowHasher hasher(collatorView); - const value::MaterializedRowEq equator(collatorView); - _ht.emplace(0, hasher, equator); + _keyEq = value::MaterializedRowEq(collatorView); + _ht.emplace(0, hasher, _keyEq); } else { _ht.emplace(); } _seekKeys.resize(_seekKeysAccessors.size()); - // A default value for spilling a key to the record store. - value::MaterializedRow defaultVal{_outAggAccessors.size()}; + // Reset state since this stage may have been previously opened. + for (auto&& accessor : _outKeyAccessors) { + accessor->setIndex(0); + } + for (auto&& accessor : _outAggAccessors) { + accessor->setIndex(0); + } + _rsCursor.reset(); + _recordStore.reset(); + _outKeyRowRecordStore = {0}; + _outAggRowRecordStore = {0}; + _spilledAggRow = {0}; + _stashedNextRow = {0, 0}; + MemoryCheckData memoryCheckData; while (_children[0]->getNext() == PlanState::ADVANCED) { @@ -356,53 +410,35 @@ void HashAggStage::open(bool reOpen) { key.reset(idx++, false, tag, val); } + bool newKey = false; + _htIt = _ht->find(key); + if (_htIt == _ht->end()) { + // The key is not present in the hash table yet, so we insert it and initialize the + // corresponding accumulator. Note that as a future optimization, we could avoid + // doing a lookup both in the 'find()' call and in 'emplace()'. + newKey = true; + key.makeOwned(); + auto [it, _] = _ht->emplace(std::move(key), value::MaterializedRow{0}); + it->second.resize(_outAggAccessors.size()); + _htIt = it; + } + + // Accumulate state in '_ht'. + for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { + auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); + _outHashAggAccessors[idx]->reset(owned, tag, val); + } - if (_htIt = _ht->find(key); !_recordStore || _htIt != _ht->end()) { - if (_htIt == _ht->end()) { - // The memory limit hasn't been reached yet, insert a new key in '_ht' by - // copying the key. Note as a future optimization, we should avoid the lookup in - // the find() call and the emplace. - key.makeOwned(); - auto [it, _] = _ht->emplace(std::move(key), value::MaterializedRow{0}); - // Initialize accumulators. - it->second.resize(_outAggAccessors.size()); - _htIt = it; - } - // Accumulate state in '_ht' by pointing the '_outAggAccessors' the - // '_outHashAggAccessors'. - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - _outAggAccessors[idx]->setIndex(0); - auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); - _outHashAggAccessors[idx]->reset(owned, tag, val); - } + if (_forceIncreasedSpilling && !newKey) { + // If configured to spill more than usual, we spill after seeing the same key twice. + spill(memoryCheckData); } else { - // The memory limit has been reached and the key wasn't in the '_ht' so we need - // to spill it to the '_recordStore'. - KeyString::Builder kb{KeyString::Version::kLatestVersion}; - key.serializeIntoKeyString(kb); - auto typeBits = kb.getTypeBits(); - - auto rid = RecordId(kb.getBuffer(), kb.getSize()); - - boost::optional<value::MaterializedRow> valFromRs = - readFromRecordStore(_opCtx, _recordStore->rs(), rid); - if (!valFromRs) { - _aggValueRecordStore = defaultVal; - } else { - _aggValueRecordStore = *valFromRs; - } - - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - _outAggAccessors[idx]->setIndex(1); - auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); - _aggValueRecordStore.reset(idx, owned, tag, val); - } - spillValueToDisk(rid, _aggValueRecordStore, typeBits, valFromRs ? true : false); + // Estimates how much memory is being used. If we estimate that the hash table + // exceeds the allotted memory budget, its contents are spilled to the + // '_recordStore' and '_ht' is cleared. + checkMemoryUsageAndSpillIfNecessary(memoryCheckData); } - // Estimates how much memory is being used and might start spilling. - checkMemoryUsageAndSpillIfNecessary(memoryCheckData); - if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) { // During trial runs, we want to limit the amount of work done by opening a blocking // stage, like this one. The blocking stage tracks the number of documents it has @@ -418,8 +454,33 @@ void HashAggStage::open(bool reOpen) { _children[0]->close(); _childOpened = false; } - } + // If we spilled at any point while consuming the input, then do one final spill to write + // any leftover contents of '_ht' to the record store. That way, when recovering the input + // from the record store and merging partial aggregates we don't have to worry about the + // possibility of some of the data being in the hash table and some being in the record + // store. + if (_recordStore) { + if (!_ht->empty()) { + spill(memoryCheckData); + } + + _specificStats.spilledDataStorageSize = _recordStore->rs()->storageSize(_opCtx); + + // Establish a cursor, positioned at the beginning of the record store. + _rsCursor = _recordStore->rs()->getCursor(_opCtx); + + // Callers will be obtaining the results from the spill table, so set the + // 'SwitchAccessors' so that they refer to the rows recovered from the record store + // under the hood. + for (auto&& accessor : _outKeyAccessors) { + accessor->setIndex(1); + } + for (auto&& accessor : _outAggAccessors) { + accessor->setIndex(1); + } + } + } if (!_seekKeysAccessors.empty()) { // Copy keys in order to do the lookup. @@ -431,20 +492,76 @@ void HashAggStage::open(bool reOpen) { } _htIt = _ht->end(); +} - // Set the SwitchAccessors to point to the '_ht' so we can drain it first before draining the - // '_recordStore' in getNext(). - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - _outAggAccessors[idx]->setIndex(0); +HashAggStage::SpilledRow HashAggStage::deserializeSpilledRecord(const Record& record, + BufBuilder& keyBuffer) { + // Read the values and type bits out of the value part of the record. + BufReader valReader(record.data.data(), record.data.size()); + auto val = value::MaterializedRow::deserializeForSorter(valReader, {}); + auto typeBits = KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader); + + keyBuffer.reset(); + auto key = value::MaterializedRow::deserializeFromKeyString( + decodeKeyString(record.id, typeBits), &keyBuffer, _gbs.size() /*numPrefixValuesToRead*/); + return {std::move(key), std::move(val)}; +} + +PlanState HashAggStage::getNextSpilled() { + if (_stashedNextRow.first.isEmpty()) { + auto nextRecord = _rsCursor->next(); + if (!nextRecord) { + return trackPlanState(PlanState::IS_EOF); + } + + // We are just starting the process of merging the spilled file segments. + auto recoveredRow = deserializeSpilledRecord(*nextRecord, _outKeyRowRSBuffer); + + _outKeyRowRecordStore = std::move(recoveredRow.first); + _outAggRowRecordStore = std::move(recoveredRow.second); + } else { + // We peeked at the next key last time around. + _outKeyRowRSBuffer = std::move(_stashedKeyBuffer); + _outKeyRowRecordStore = std::move(_stashedNextRow.first); + _outAggRowRecordStore = std::move(_stashedNextRow.second); + // Clear the stashed row. + _stashedNextRow = {0, 0}; } - _drainingRecordStore = false; + + // Find additional partial aggregates for the same key and merge them in order to compute the + // final output. + for (auto nextRecord = _rsCursor->next(); nextRecord; nextRecord = _rsCursor->next()) { + auto recoveredRow = deserializeSpilledRecord(*nextRecord, _stashedKeyBuffer); + if (!_keyEq(recoveredRow.first, _outKeyRowRecordStore)) { + // The newly recovered spilled row belongs to a new key, so we're done merging partial + // aggregates for the old key. Save the new row for later and return advanced. + _stashedNextRow = std::move(recoveredRow); + return trackPlanState(PlanState::ADVANCED); + } + + // Merge in the new partial aggregate values. + _spilledAggRow = std::move(recoveredRow.second); + for (size_t idx = 0; idx < _mergingExprCodes.size(); ++idx) { + auto [owned, tag, val] = _bytecode.run(_mergingExprCodes[idx].get()); + _outRecordStoreAggAccessors[idx]->reset(owned, tag, val); + } + } + + return trackPlanState(PlanState::ADVANCED); } PlanState HashAggStage::getNext() { auto optTimer(getOptTimer(_opCtx)); - if (_htIt == _ht->end() && !_drainingRecordStore) { - // First invocation of getNext() after open() when not draining the '_recordStore'. + // If we've spilled, then we need to produce the output by merging the spilled segments from the + // spill file. + if (_recordStore) { + return getNextSpilled(); + } + + // We didn't spill. Obtain the next output row from the hash table. + if (_htIt == _ht->end()) { + // First invocation of getNext() after open(). if (!_seekKeysAccessors.empty()) { _htIt = _ht->find(_seekKeys); } else { @@ -453,55 +570,13 @@ PlanState HashAggStage::getNext() { } else if (!_seekKeysAccessors.empty()) { // Subsequent invocation with seek keys. Return only 1 single row (if any). _htIt = _ht->end(); - } else if (!_drainingRecordStore) { - // Returning the results of the entire hash table first before draining the '_recordStore'. + } else { ++_htIt; } - if (_htIt == _ht->end() && !_recordStore) { - // The hash table has been drained and nothing was spilled to disk. + if (_htIt == _ht->end()) { + // The hash table has been drained (and we never spilled to disk) so we're done. return trackPlanState(PlanState::IS_EOF); - } else if (_htIt != _ht->end()) { - // Drain the '_ht' on the next 'getNext()' call. - return trackPlanState(PlanState::ADVANCED); - } else if (_seekKeysAccessors.empty()) { - // A record store was created to spill to disk. Drain it then clean it up. - if (!_rsCursor) { - _rsCursor = _recordStore->rs()->getCursor(_opCtx); - } - auto nextRecord = _rsCursor->next(); - if (nextRecord) { - // Point the out accessors to the recordStore accessors to allow parent stages to read - // the agg state from the '_recordStore'. - if (!_drainingRecordStore) { - for (size_t i = 0; i < _outKeyAccessors.size(); ++i) { - _outKeyAccessors[i]->setIndex(1); - } - for (size_t i = 0; i < _outAggAccessors.size(); ++i) { - _outAggAccessors[i]->setIndex(1); - } - } - _drainingRecordStore = true; - - // Read the agg state value from the '_recordStore' and Reconstruct the key from the - // typeBits stored along side of the value. - BufReader valReader(nextRecord->data.data(), nextRecord->data.size()); - auto val = value::MaterializedRow::deserializeForSorter(valReader, {}); - auto typeBits = - KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader); - _aggValueRecordStore = val; - - _aggKeyRSBuffer.reset(); - _aggKeyRecordStore = value::MaterializedRow::deserializeFromKeyString( - decodeKeyString(nextRecord->id, typeBits), &_aggKeyRSBuffer); - return trackPlanState(PlanState::ADVANCED); - } else { - _rsCursor.reset(); - _specificStats.spilledRecordEstimatedStorageSize = - _recordStore->rs()->storageSize(_opCtx); - _recordStore.reset(); - return trackPlanState(PlanState::IS_EOF); - } } else { return trackPlanState(PlanState::ADVANCED); } @@ -521,13 +596,19 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co childrenBob.append(str::stream() << slot, printer.print(expr->debugPrint())); } } + + if (!_mergingExprs.empty()) { + BSONObjBuilder nestedBuilder{bob.subobjStart("mergingExprs")}; + for (auto&& [slot, expr] : _mergingExprs) { + nestedBuilder.append(str::stream() << slot, printer.print(expr->debugPrint())); + } + } + // Spilling stats. bob.appendBool("usedDisk", _specificStats.usedDisk); + bob.appendNumber("numSpills", _specificStats.numSpills); bob.appendNumber("spilledRecords", _specificStats.spilledRecords); - bob.appendNumber("spilledBytesApprox", - _specificStats.lastSpilledRecordSize * _specificStats.spilledRecords); - bob.appendNumber("spilledRecordEstimatedStorageSize", - _specificStats.spilledRecordEstimatedStorageSize); + bob.appendNumber("spilledDataStorageSize", _specificStats.spilledDataStorageSize); ret->debugInfo = bob.obj(); } @@ -545,11 +626,12 @@ void HashAggStage::close() { trackClose(); _ht = boost::none; - if (_recordStore) { - // A record store was created to spill to disk. Clean it up. - _recordStore.reset(); - _drainingRecordStore = false; - } + _rsCursor.reset(); + _recordStore.reset(); + _outKeyRowRecordStore = {0}; + _outAggRowRecordStore = {0}; + _spilledAggRow = {0}; + _stashedNextRow = {0, 0}; if (_childOpened) { _children[0]->close(); @@ -572,7 +654,7 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { ret.emplace_back(DebugPrinter::Block("[`")); bool first = true; - value::orderedSlotMapTraverse(_aggs, [&](auto slot, auto&& expr) { + for (auto&& [slot, expr] : _aggs) { if (!first) { ret.emplace_back(DebugPrinter::Block("`,")); } @@ -581,7 +663,7 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { ret.emplace_back("="); DebugPrinter::addBlocks(ret, expr->debugPrint()); first = false; - }); + } ret.emplace_back("`]"); if (!_seekKeysSlots.empty()) { @@ -596,6 +678,28 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { ret.emplace_back("`]"); } + if (!_mergingExprs.empty()) { + ret.emplace_back("spillSlots[`"); + for (size_t idx = 0; idx < _mergingExprs.size(); ++idx) { + if (idx) { + ret.emplace_back("`,"); + } + + DebugPrinter::addIdentifier(ret, _mergingExprs[idx].first); + } + ret.emplace_back("`]"); + + ret.emplace_back("mergingExprs[`"); + for (size_t idx = 0; idx < _mergingExprs.size(); ++idx) { + if (idx) { + ret.emplace_back("`,"); + } + + DebugPrinter::addBlocks(ret, _mergingExprs[idx].second->debugPrint()); + } + ret.emplace_back("`]"); + } + if (!_optimizedClose) { ret.emplace_back("reopen"); } @@ -616,6 +720,7 @@ size_t HashAggStage::estimateCompileTimeSize() const { size += size_estimator::estimate(_gbs); size += size_estimator::estimate(_aggs); size += size_estimator::estimate(_seekKeysSlots); + size += size_estimator::estimate(_mergingExprs); return size; } diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h index d200c4b9c3d..86f7337b7f7 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.h +++ b/src/mongo/db/exec/sbe/stages/hash_agg.h @@ -29,8 +29,6 @@ #pragma once -#include <unordered_map> - #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/exec/sbe/vm/vm.h" @@ -61,22 +59,39 @@ namespace sbe { * determining whether two group-by keys are equal. For instance, the plan may require us to do a * case-insensitive group on a string field. * + * The 'allowDiskUse' flag controls whether this stage can spill. If false and the memory budget is + * exhausted, this stage throws a query-fatal error with code + * 'QueryExceededMemoryLimitNoDiskUseAllowed'. If true, then spilling is possible and the caller + * must provide a vector of 'mergingExprs'. This is a vector of (slot, expression) pairs which is + * symmetrical with 'aggs'. The slots are only visible internally and are used to store partial + * aggregate values that have been recovered from the spill table. Each of the expressions is an agg + * function which merges the partial aggregate value from this slot into the final aggregate value. + * In the debug string output, the internal slots used to house the partial aggregates are printed + * as a list of "spillSlots" and the expressions are printed as a parallel list of "mergingExprs". + * + * If 'forcedIncreasedSpilling' is true, then this stage will spill frequently even if the memory + * limit is not reached. This is intended to be used in test contexts to exercise the otherwise + * infrequently used spilling logic. + * * Debug string representation: * - * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? reopen? - * collatorSlot? childStage + * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? + * spillSlots[slot_1, ..., slot_n] mergingExprs[expr_1, ..., expr_n] reopen? collatorSlot? + * childStage */ class HashAggStage final : public PlanStage { public: HashAggStage(std::unique_ptr<PlanStage> input, value::SlotVector gbs, - value::SlotMap<std::unique_ptr<EExpression>> aggs, + SlotExprPairVector aggs, value::SlotVector seekKeysSlots, bool optimizedClose, boost::optional<value::SlotId> collatorSlot, bool allowDiskUse, + SlotExprPairVector mergingExprs, PlanNodeId planNodeId, - bool participateInTrialRunTracking = true); + bool participateInTrialRunTracking = true, + bool forceIncreasedSpilling = false); std::unique_ptr<PlanStage> clone() const final; @@ -109,99 +124,138 @@ private: using HashKeyAccessor = value::MaterializedRowKeyAccessor<TableType::iterator>; using HashAggAccessor = value::MaterializedRowValueAccessor<TableType::iterator>; - void makeTemporaryRecordStore(); - - /** - * Spills a key and value pair to the '_recordStore' where the semantics are insert or update - * depending on the 'update' flag. When the 'update' flag is true this method already expects - * the 'key' to be inserted into the '_recordStore', otherwise the 'key' and 'val' pair are - * fresh. - * - * This method expects the key to be seralized into a KeyString::Value so that the key is - * memcmp-able and lookups can be done to update the 'val' in the '_recordStore'. Note that the - * 'typeBits' are needed to reconstruct the spilled 'key' when calling 'getNext' to deserialize - * the 'key' to a MaterializedRow. Since the '_recordStore' only stores the memcmp-able part of - * the KeyString we need to carry the 'typeBits' separately, and we do this by appending the - * 'typeBits' to the end of the serialized 'val' buffer and store them at the leaves of the - * backing B-tree of the '_recordStore'. used as the RecordId. - */ - void spillValueToDisk(const RecordId& key, - const value::MaterializedRow& val, - const KeyString::TypeBits& typeBits, - bool update); - void spillRowToDisk(const value::MaterializedRow& key, - const value::MaterializedRow& defaultVal); + using SpilledRow = std::pair<value::MaterializedRow, value::MaterializedRow>; /** * We check amount of used memory every T processed incoming records, where T is calculated * based on the estimated used memory and its recent growth. When the memory limit is exceeded, - * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' and might spill some of - * the already accumulated data into it. + * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' (if it hasn't already been + * created) and spill the contents of the hash table into this record store. */ struct MemoryCheckData { + MemoryCheckData() { + reset(); + } + + void reset() { + memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency); + nextMemoryCheckpoint = 0; + memoryCheckpointCounter = 0; + lastEstimatedMemoryUsage = 0; + } + const double checkpointMargin = internalQuerySBEAggMemoryUseCheckMargin.load(); - const long atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load(); - const long atLeastMemoryCheckFrequency = + const int64_t atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load(); + const int64_t atLeastMemoryCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load(); // The check frequency upper bound, which start at 'atMost' and exponentially backs off // to 'atLeast' as more data is accumulated. If 'atLeast' is less than 'atMost', the memory // checks will be done every 'atLeast' incoming records. - long memoryCheckFrequency = 1; + int64_t memoryCheckFrequency = 1; // The number of incoming records to process before the next memory checkpoint. - long nextMemoryCheckpoint = 0; + int64_t nextMemoryCheckpoint = 0; // The counter of the incoming records between memory checkpoints. - long memoryCheckpointCounter = 0; + int64_t memoryCheckpointCounter = 0; - long long lastEstimatedMemoryUsage = 0; - - MemoryCheckData() { - memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency); - } + int64_t lastEstimatedMemoryUsage = 0; }; + + /** + * Inserts a key and value pair to the '_recordStore'. They key is serialized to a + * 'KeyString::Value' which becomes the 'RecordId'. This makes the keys memcmp-able and ensures + * that the record store ends up sorted by the group-by keys. + * + * Note that the 'typeBits' are needed to reconstruct the spilled 'key' to a 'MaterializedRow', + * but are not necessary for comparison purposes. Therefore, we carry the type bits separately + * from the record id, instead appending them to the end of the serialized 'val' buffer. + */ + void spillRowToDisk(const value::MaterializedRow& key, const value::MaterializedRow& val); + void checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd); + void spill(MemoryCheckData& mcd); + + /** + * Given a 'record' from the record store, decodes it into a pair of materialized rows (one for + * the group-by keys and another for the agg values). + * + * The given 'keyBuffer' is cleared, and then used to hold data (e.g. long strings and other + * values that can't be inlined) obtained by decoding the 'RecordId' keystring to a + * 'MaterializedRow'. The values in the resulting 'MaterializedRow' may be pointers into + * 'keyBuffer', so it is important that 'keyBuffer' outlive the row. + */ + SpilledRow deserializeSpilledRecord(const Record& record, BufBuilder& keyBuffer); + + PlanState getNextSpilled(); + + void makeTemporaryRecordStore(); const value::SlotVector _gbs; - const value::SlotMap<std::unique_ptr<EExpression>> _aggs; + const SlotExprPairVector _aggs; const boost::optional<value::SlotId> _collatorSlot; const bool _allowDiskUse; const value::SlotVector _seekKeysSlots; // When this operator does not expect to be reopened (almost always) then it can close the child // early. const bool _optimizedClose{true}; + + // Expressions used to merge partial aggregates that have been spilled to disk and their + // corresponding input slots. For example, imagine that this list contains a pair (s12, + // sum(s12)). This means that the partial aggregate values will be read into slot s12 after + // being recovered from the spill table and can be merged using the 'sum()' agg function. + // + // When disk use is allowed, this vector must have the same length as '_aggs'. + const SlotExprPairVector _mergingExprs; + + // When true, we spill frequently without reaching the memory limit. This allows us to exercise + // the spilling logic more often in test contexts. + const bool _forceIncreasedSpilling; + value::SlotAccessorMap _outAccessors; + + // Accessors used to obtain the values of the group by slots when reading the input from the + // child. std::vector<value::SlotAccessor*> _inKeyAccessors; - // Accessors for the key stored in '_ht', a SwitchAccessor is used so we can produce the key - // from either the '_ht' or the '_recordStore'. + // This buffer stores values for '_outKeyRowRecordStore'; values in the '_outKeyRowRecordStore' + // can be pointers that point to data in this buffer. + BufBuilder _outKeyRowRSBuffer; + // Accessors for the key slots provided as output by this stage. The keys can either come from + // the hash table or recovered from a temporary record store. We use a 'SwitchAccessor' to + // switch between these two cases. std::vector<std::unique_ptr<HashKeyAccessor>> _outHashKeyAccessors; + // Row of key values to output used when recovering spilled data from the record store. + value::MaterializedRow _outKeyRowRecordStore{0}; + std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors; std::vector<std::unique_ptr<value::SwitchAccessor>> _outKeyAccessors; - // Accessor for the agg state value stored in the '_recordStore' when data is spilled to disk. - value::MaterializedRow _aggKeyRecordStore{0}; - value::MaterializedRow _aggValueRecordStore{0}; - std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors; + // Accessors for the output aggregate results. The aggregates can either come from the hash + // table or can be computed after merging partial aggregates spilled to a record store. We use a + // 'SwitchAccessor' to switch between these two cases. + std::vector<std::unique_ptr<HashAggAccessor>> _outHashAggAccessors; + // Row of agg values to output used when recovering spilled data from the record store. + value::MaterializedRow _outAggRowRecordStore{0}; std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreAggAccessors; - - // This buffer stores values for the spilled '_aggKeyRecordStore' that's loaded into memory from - // the '_recordStore'. Values in the '_aggKeyRecordStore' row are pointers that point to data in - // this buffer. - BufBuilder _aggKeyRSBuffer; + std::vector<std::unique_ptr<value::SwitchAccessor>> _outAggAccessors; std::vector<value::SlotAccessor*> _seekKeysAccessors; value::MaterializedRow _seekKeys; - // Accesors for the agg state in '_ht', a SwitchAccessor is used so we can produce the agg state - // from either the '_ht' or the '_recordStore' when draining the HashAgg stage. - std::vector<std::unique_ptr<value::SwitchAccessor>> _outAggAccessors; - std::vector<std::unique_ptr<HashAggAccessor>> _outHashAggAccessors; + // Bytecode which gets executed to aggregate incoming rows into the hash table. std::vector<std::unique_ptr<vm::CodeFragment>> _aggCodes; + // Bytecode for the merging expressions, executed if partial aggregates are spilled to a record + // store and need to be subsequently combined. + std::vector<std::unique_ptr<vm::CodeFragment>> _mergingExprCodes; // Only set if collator slot provided on construction. value::SlotAccessor* _collatorAccessor = nullptr; + // Function object which can be used to check whether two materialized rows of key values are + // equal. This comparison is collation-aware if the query has a non-simple collation. + value::MaterializedRowEq _keyEq; + boost::optional<TableType> _ht; TableType::iterator _htIt; @@ -213,10 +267,31 @@ private: // Memory tracking and spilling to disk. const long long _approxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); + + // A record store which is instantiated and written to in the case of spilling. std::unique_ptr<TemporaryRecordStore> _recordStore; - bool _drainingRecordStore{false}; std::unique_ptr<SeekableRecordCursor> _rsCursor; + // A monotically increasing counter used to ensure uniqueness of 'RecordId' values. When + // spilling, the key is encoding into the 'RecordId' of the '_recordStore'. Record ids must be + // unique by definition, but we might end up spilling multiple partial aggregates for the same + // key. We ensure uniqueness by appending a unique integer to the end of this key, which is + // simply ignored during deserialization. + int64_t _ridCounter = 0; + + // Partial aggregates that have been spilled are read into '_spilledAggRow' and read using + // '_spilledAggsAccessors' so that they can be merged to compute the final aggregate value. + value::MaterializedRow _spilledAggRow{0}; + std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _spilledAggsAccessors; + value::SlotAccessorMap _spilledAggsAccessorMap; + + // Buffer to hold data for the deserialized key values from '_stashedNextRow'. + BufBuilder _stashedKeyBuffer; + // Place to stash the next keys and values during the streaming phase. The record store cursor + // doesn't offer a "peek" API, so we need to hold onto the next row between getNext() calls when + // the key value advances. + SpilledRow _stashedNextRow; + HashAggStats _specificStats; // If provided, used during a trial run to accumulate certain execution stats. Once the trial diff --git a/src/mongo/db/exec/sbe/stages/plan_stats.h b/src/mongo/db/exec/sbe/stages/plan_stats.h index 360731a13cb..2801c8abb07 100644 --- a/src/mongo/db/exec/sbe/stages/plan_stats.h +++ b/src/mongo/db/exec/sbe/stages/plan_stats.h @@ -319,9 +319,13 @@ struct HashAggStats : public SpecificStats { } bool usedDisk{false}; + // The number of times that the entire hash table was spilled. + long long numSpills{0}; + // The number of individual records spilled to disk. long long spilledRecords{0}; - long long lastSpilledRecordSize{0}; - long long spilledRecordEstimatedStorageSize{0}; + // An estimate, in bytes, of the size of the final spill table after all spill events have taken + // place. + long long spilledDataStorageSize{0}; }; struct HashLookupStats : public SpecificStats { diff --git a/src/mongo/db/exec/sbe/util/spilling.cpp b/src/mongo/db/exec/sbe/util/spilling.cpp index c54f3bfe956..6e8c027d7fe 100644 --- a/src/mongo/db/exec/sbe/util/spilling.cpp +++ b/src/mongo/db/exec/sbe/util/spilling.cpp @@ -85,7 +85,6 @@ int upsertToRecordStore(OperationContext* opCtx, BufBuilder& buf, const KeyString::TypeBits& typeBits, // recover type of value. bool update) { - // Append the 'typeBits' to the end of the val's buffer so the 'key' can be reconstructed when // draining HashAgg. buf.appendBuf(typeBits.getBuffer(), typeBits.getSize()); diff --git a/src/mongo/db/exec/sbe/util/spilling.h b/src/mongo/db/exec/sbe/util/spilling.h index 95f73e2b02e..0a562d7e864 100644 --- a/src/mongo/db/exec/sbe/util/spilling.h +++ b/src/mongo/db/exec/sbe/util/spilling.h @@ -55,9 +55,12 @@ boost::optional<value::MaterializedRow> readFromRecordStore(OperationContext* op RecordStore* rs, const RecordId& rid); -/** Inserts or updates a key/value into 'rs'. The 'update' flag controls whether or not an update +/** + * Inserts or updates a key/value into 'rs'. The 'update' flag controls whether or not an update * will be performed. If a key/value pair is inserted into the 'rs' that already exists and * 'update' is false, this function will tassert. + * + * Returns the size of the new record in bytes, including the record id and value portions. */ int upsertToRecordStore(OperationContext* opCtx, RecordStore* rs, @@ -65,7 +68,6 @@ int upsertToRecordStore(OperationContext* opCtx, const value::MaterializedRow& val, const KeyString::TypeBits& typeBits, bool update); - int upsertToRecordStore(OperationContext* opCtx, RecordStore* rs, const RecordId& key, diff --git a/src/mongo/db/exec/sbe/values/slot.cpp b/src/mongo/db/exec/sbe/values/slot.cpp index e9f6e08f313..32a4275ff5f 100644 --- a/src/mongo/db/exec/sbe/values/slot.cpp +++ b/src/mongo/db/exec/sbe/values/slot.cpp @@ -565,8 +565,10 @@ void MaterializedRow::serializeIntoKeyString(KeyString::Builder& buf) const { } } -MaterializedRow MaterializedRow::deserializeFromKeyString(const KeyString::Value& keyString, - BufBuilder* valueBufferBuilder) { +MaterializedRow MaterializedRow::deserializeFromKeyString( + const KeyString::Value& keyString, + BufBuilder* valueBufferBuilder, + boost::optional<size_t> numPrefixValsToRead) { BufReader reader(keyString.getBuffer(), keyString.getSize()); KeyString::TypeBits typeBits(keyString.getTypeBits()); KeyString::TypeBits::Reader typeBitsReader(typeBits); @@ -578,7 +580,8 @@ MaterializedRow MaterializedRow::deserializeFromKeyString(const KeyString::Value &reader, &typeBitsReader, false /* inverted */, typeBits.version, &valBuilder); } while (keepReading); - MaterializedRow result{valBuilder.numValues()}; + size_t sizeOfRow = numPrefixValsToRead ? *numPrefixValsToRead : valBuilder.numValues(); + MaterializedRow result{sizeOfRow}; valBuilder.readValues(result); return result; diff --git a/src/mongo/db/exec/sbe/values/slot.h b/src/mongo/db/exec/sbe/values/slot.h index efd92255c9f..49020874bf6 100644 --- a/src/mongo/db/exec/sbe/values/slot.h +++ b/src/mongo/db/exec/sbe/values/slot.h @@ -503,10 +503,16 @@ public: * intended for spilling key values used in the HashAgg stage. The format is not guaranteed to * be stable between versions, so it should not be used for long-term storage or communication * between instances. + * + * If 'numPrefixValsToRead' is provided, then only the given number of values from 'keyString' + * are decoded into the resulting 'MaterializedRow'. The remaining suffix values in the + * 'keyString' are ignored. */ - static MaterializedRow deserializeFromKeyString(const KeyString::Value& keyString, + static MaterializedRow deserializeFromKeyString( + const KeyString::Value& keyString, + BufBuilder* valueBufferBuilder, + boost::optional<size_t> numPrefixValsToRead = boost::none); - BufBuilder* valueBufferBuilder); void serializeIntoKeyString(KeyString::Builder& builder) const; private: @@ -597,9 +603,9 @@ private: }; struct MaterializedRowEq { - using ComparatorType = StringData::ComparatorInterface*; + using ComparatorType = StringData::ComparatorInterface; - explicit MaterializedRowEq(const ComparatorType comparator = nullptr) + explicit MaterializedRowEq(const ComparatorType* comparator = nullptr) : _comparator(comparator) {} bool operator()(const MaterializedRow& lhs, const MaterializedRow& rhs) const { @@ -617,7 +623,7 @@ struct MaterializedRowEq { } private: - const ComparatorType _comparator = nullptr; + const ComparatorType* _comparator = nullptr; }; struct MaterializedRowLess { diff --git a/src/mongo/db/exec/sbe/values/value_builder.h b/src/mongo/db/exec/sbe/values/value_builder.h index 9ad2b511242..a39432a4429 100644 --- a/src/mongo/db/exec/sbe/values/value_builder.h +++ b/src/mongo/db/exec/sbe/values/value_builder.h @@ -323,7 +323,10 @@ public: auto bufferLen = _valueBufferBuilder->len(); size_t bufIdx = 0; size_t rowIdx = 0; - while (bufIdx < _numValues) { + // The 'row' output parameter might be smaller than the number of values owned by this + // builder. Be careful to only read as many values into 'row' as this output 'row' has space + // for. + while (rowIdx < row.size()) { invariant(rowIdx < row.size()); auto [_, tagNothing, valNothing] = getValue(bufIdx++, bufferLen); tassert(6136200, "sbe tag must be 'Boolean'", tagNothing == TypeTags::Boolean); diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp index 6d0bc2f38e7..3a8e48daf0a 100644 --- a/src/mongo/db/exec/sbe/vm/vm.cpp +++ b/src/mongo/db/exec/sbe/vm/vm.cpp @@ -4219,6 +4219,41 @@ FastTuple<bool, value::TypeTags, value::Value> ByteCode::aggSetUnionCappedImpl( return {ownAcc, tagAcc, valAcc}; } +FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnion(ArityType arity) { + auto [ownAcc, tagAcc, valAcc] = getFromStack(0); + + if (tagAcc == value::TypeTags::Nothing) { + // Initialize the accumulator. + ownAcc = true; + std::tie(tagAcc, valAcc) = value::makeNewArraySet(); + } else { + // Take ownership of the accumulator. + topStack(false, value::TypeTags::Nothing, 0); + } + + tassert(7039552, "accumulator must be owned", ownAcc); + value::ValueGuard guardAcc{tagAcc, valAcc}; + tassert(7039553, "accumulator must be of type ArraySet", tagAcc == value::TypeTags::ArraySet); + auto acc = value::getArraySetView(valAcc); + + auto [tagNewSet, valNewSet] = moveOwnedFromStack(1); + value::ValueGuard guardNewSet{tagNewSet, valNewSet}; + if (!value::isArray(tagNewSet)) { + return {false, value::TypeTags::Nothing, 0}; + } + + auto i = value::ArrayEnumerator{tagNewSet, valNewSet}; + while (!i.atEnd()) { + auto [elTag, elVal] = i.getViewOfValue(); + auto [copyTag, copyVal] = value::copyValue(elTag, elVal); + acc->push_back(copyTag, copyVal); + i.advance(); + } + + guardAcc.reset(); + return {ownAcc, tagAcc, valAcc}; +} + FastTuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnionCapped(ArityType arity) { auto [tagNewElem, valNewElem] = moveOwnedFromStack(1); value::ValueGuard guardNewElem{tagNewElem, valNewElem}; @@ -5354,6 +5389,8 @@ FastTuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builtin return builtinConcatArrays(arity); case Builtin::aggConcatArraysCapped: return builtinAggConcatArraysCapped(arity); + case Builtin::aggSetUnion: + return builtinAggSetUnion(arity); case Builtin::aggSetUnionCapped: return builtinAggSetUnionCapped(arity); case Builtin::aggCollSetUnionCapped: @@ -5551,6 +5588,8 @@ std::string builtinToString(Builtin b) { return "concatArrays"; case Builtin::aggConcatArraysCapped: return "aggConcatArraysCapped"; + case Builtin::aggSetUnion: + return "aggSetUnion"; case Builtin::aggSetUnionCapped: return "aggSetUnionCapped"; case Builtin::aggCollSetUnionCapped: diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h index 0ed76cefad0..b6ee6d2dc75 100644 --- a/src/mongo/db/exec/sbe/vm/vm.h +++ b/src/mongo/db/exec/sbe/vm/vm.h @@ -669,6 +669,8 @@ enum class Builtin : uint8_t { // specified size. aggSetUnionCapped, aggCollSetUnionCapped, + // Agg function for a simple set union (with no size cap or collation). + aggSetUnion, acos, acosh, @@ -1289,6 +1291,7 @@ private: FastTuple<bool, value::TypeTags, value::Value> builtinConcat(ArityType arity); FastTuple<bool, value::TypeTags, value::Value> builtinConcatArrays(ArityType arity); FastTuple<bool, value::TypeTags, value::Value> builtinAggConcatArraysCapped(ArityType arity); + FastTuple<bool, value::TypeTags, value::Value> builtinAggSetUnion(ArityType arity); FastTuple<bool, value::TypeTags, value::Value> builtinAggSetUnionCapped(ArityType arity); FastTuple<bool, value::TypeTags, value::Value> builtinAggCollSetUnionCapped(ArityType arity); FastTuple<bool, value::TypeTags, value::Value> aggSetUnionCappedImpl( diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp index d48ee06aa50..186b877c5ec 100644 --- a/src/mongo/db/query/sbe_stage_builder.cpp +++ b/src/mongo/db/query/sbe_stage_builder.cpp @@ -2248,12 +2248,11 @@ std::tuple<sbe::value::SlotVector, EvalStage, std::unique_ptr<sbe::EExpression>> return {sbe::value::SlotVector{slot}, std::move(stage), nullptr}; } -sbe::value::SlotVector generateAccumulator( - StageBuilderState& state, - const AccumulationStatement& accStmt, - const PlanStageSlots& outputs, - sbe::value::SlotIdGenerator* slotIdGenerator, - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>>& accSlotToExprMap) { +sbe::value::SlotVector generateAccumulator(StageBuilderState& state, + const AccumulationStatement& accStmt, + const PlanStageSlots& outputs, + sbe::value::SlotIdGenerator* slotIdGenerator, + sbe::SlotExprPairVector& accSlotExprPairs) { auto rootSlot = outputs.getIfExists(PlanStageSlots::kResult); auto argExpr = generateExpression(state, accStmt.expr.argument.get(), rootSlot, &outputs); @@ -2268,12 +2267,48 @@ sbe::value::SlotVector generateAccumulator( for (auto& accExpr : accExprs) { auto slot = slotIdGenerator->generate(); aggSlots.push_back(slot); - accSlotToExprMap.emplace(slot, std::move(accExpr)); + accSlotExprPairs.push_back({slot, std::move(accExpr)}); } return aggSlots; } +/** + * Generate a vector of (inputSlot, mergingExpression) pairs. The slot (whose id is allocated by + * this function) will be used to store spilled partial aggregate values that have been recovered + * from disk and deserialized. The merging expression is an agg function which combines these + * partial aggregates. + * + * Usually the returned vector will be of length 1, but in some cases the MQL accumulation statement + * is implemented by calculating multiple separate aggregates in the SBE plan, which are finalized + * by a subsequent project stage to produce the ultimate value. + */ +sbe::SlotExprPairVector generateMergingExpressions(StageBuilderState& state, + const AccumulationStatement& accStmt, + int numInputSlots) { + tassert(7039555, "'numInputSlots' must be positive", numInputSlots > 0); + auto slotIdGenerator = state.slotIdGenerator; + tassert(7039556, "expected non-null 'slotIdGenerator' pointer", slotIdGenerator); + auto frameIdGenerator = state.frameIdGenerator; + tassert(7039557, "expected non-null 'frameIdGenerator' pointer", frameIdGenerator); + + auto spillSlots = slotIdGenerator->generateMultiple(numInputSlots); + auto collatorSlot = state.data->env->getSlotIfExists("collator"_sd); + auto mergingExprs = + buildCombinePartialAggregates(accStmt, spillSlots, collatorSlot, *frameIdGenerator); + + // Zip the slot vector and expression vector into a vector of pairs. + tassert(7039550, + "expected same number of slots and input exprs", + spillSlots.size() == mergingExprs.size()); + sbe::SlotExprPairVector result; + result.reserve(spillSlots.size()); + for (size_t i = 0; i < spillSlots.size(); ++i) { + result.push_back({spillSlots[i], std::move(mergingExprs[i])}); + } + return result; +} + std::tuple<std::vector<std::string>, sbe::value::SlotVector, EvalStage> generateGroupFinalStage( StageBuilderState& state, EvalStage groupEvalStage, @@ -2517,11 +2552,23 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder // Translates accumulators which are executed inside the group stage and gets slots for // accumulators. stage_builder::EvalStage currentStage = std::move(groupByEvalStage); - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> accSlotToExprMap; + sbe::SlotExprPairVector accSlotExprPairs; std::vector<sbe::value::SlotVector> aggSlotsVec; + // Since partial accumulator state may be spilled to disk and then merged, we must construct not + // only the basic agg expressions for each accumulator, but also agg expressions that are used + // to combine partial aggregates that have been spilled to disk. + sbe::SlotExprPairVector mergingExprs; for (const auto& accStmt : accStmts) { - aggSlotsVec.emplace_back(generateAccumulator( - _state, accStmt, childOutputs, &_slotIdGenerator, accSlotToExprMap)); + sbe::value::SlotVector curAggSlots = + generateAccumulator(_state, accStmt, childOutputs, &_slotIdGenerator, accSlotExprPairs); + + sbe::SlotExprPairVector curMergingExprs = + generateMergingExpressions(_state, accStmt, curAggSlots.size()); + + aggSlotsVec.emplace_back(std::move(curAggSlots)); + mergingExprs.insert(mergingExprs.end(), + std::make_move_iterator(curMergingExprs.begin()), + std::make_move_iterator(curMergingExprs.end())); } // There might be duplicated expressions and slots. Dedup them before creating a HashAgg @@ -2531,9 +2578,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder // Builds a group stage with accumulator expressions and group-by slot(s). auto groupEvalStage = makeHashAgg(std::move(currentStage), dedupedGroupBySlots, - std::move(accSlotToExprMap), + std::move(accSlotExprPairs), _state.data->env->getSlotIfExists("collator"_sd), _cq.getExpCtx()->allowDiskUse, + std::move(mergingExprs), nodeId); tassert( diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp index 29da8e82b08..70e4693866c 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp +++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp @@ -485,15 +485,20 @@ EvalStage makeUnion(std::vector<EvalStage> inputStages, EvalStage makeHashAgg(EvalStage stage, sbe::value::SlotVector gbs, - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs, + sbe::SlotExprPairVector aggs, boost::optional<sbe::value::SlotId> collatorSlot, bool allowDiskUse, + sbe::SlotExprPairVector mergingExprs, PlanNodeId planNodeId) { stage.setOutSlots(gbs); for (auto& [slot, _] : aggs) { stage.addOutSlot(slot); } + // In debug builds, we artificially force frequent spilling. This makes sure that our tests + // exercise the spilling algorithm and the associated logic for merging partial aggregates which + // otherwise would require large data sizes to exercise. + const bool forceIncreasedSpilling = kDebugBuild && allowDiskUse; stage.setStage(sbe::makeS<sbe::HashAggStage>(stage.extractStage(planNodeId), std::move(gbs), std::move(aggs), @@ -501,7 +506,10 @@ EvalStage makeHashAgg(EvalStage stage, true /* optimized close */, collatorSlot, allowDiskUse, - planNodeId)); + std::move(mergingExprs), + planNodeId, + true /* participateInTrialRunTracking */, + forceIncreasedSpilling)); return stage; } diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h index 06e67456b11..fdf0ea39a02 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.h +++ b/src/mongo/db/query/sbe_stage_builder_helpers.h @@ -37,6 +37,7 @@ #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/match_path.h" #include "mongo/db/exec/sbe/stages/filter.h" +#include "mongo/db/exec/sbe/stages/hash_agg.h" #include "mongo/db/exec/sbe/stages/makeobj.h" #include "mongo/db/exec/sbe/stages/project.h" #include "mongo/db/pipeline/expression.h" @@ -424,9 +425,10 @@ EvalStage makeUnion(std::vector<EvalStage> inputStages, EvalStage makeHashAgg(EvalStage stage, sbe::value::SlotVector gbs, - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs, + sbe::SlotExprPairVector aggs, boost::optional<sbe::value::SlotId> collatorSlot, bool allowDiskUse, + sbe::SlotExprPairVector mergingExprs, PlanNodeId planNodeId); EvalStage makeMkBsonObj(EvalStage stage, diff --git a/src/mongo/db/query/sbe_stage_builder_lookup.cpp b/src/mongo/db/query/sbe_stage_builder_lookup.cpp index 4a4a5d396b6..2c57fe6374f 100644 --- a/src/mongo/db/query/sbe_stage_builder_lookup.cpp +++ b/src/mongo/db/query/sbe_stage_builder_lookup.cpp @@ -364,12 +364,15 @@ std::pair<SlotId /* keyValuesSetSlot */, std::unique_ptr<sbe::PlanStage>> buildK // Re-pack the individual key values into a set. We don't cap "addToSet" here because its size // is bounded by the size of the record. SlotId keyValuesSetSlot = slotIdGenerator.generate(); + SlotId spillSlot = slotIdGenerator.generate(); EvalStage packedKeyValuesStage = makeHashAgg( EvalStage{std::move(keyValuesStage), SlotVector{}}, makeSV(), /* groupBy slots - "none" means creating a single group */ - makeEM(keyValuesSetSlot, makeFunction("addToSet"_sd, makeVariable(keyValueSlot))), + makeSlotExprPairVec(keyValuesSetSlot, + makeFunction("addToSet"_sd, makeVariable(keyValueSlot))), boost::none /* we group _all_ key values into a single set, so collator is irrelevant */, allowDiskUse, + makeSlotExprPairVec(spillSlot, makeFunction("aggSetUnion"_sd, makeVariable(spillSlot))), nodeId); // The set in 'keyValuesSetSlot' might end up empty if the localField contained only missing and @@ -411,15 +414,20 @@ std::pair<SlotId /* resultSlot */, std::unique_ptr<sbe::PlanStage>> buildForeign // are no matches, return an empty array. const int sizeCap = internalLookupStageIntermediateDocumentMaxSizeBytes.load(); SlotId accumulatorSlot = slotIdGenerator.generate(); + SlotId spillSlot = slotIdGenerator.generate(); innerBranch = makeHashAgg( std::move(innerBranch), makeSV(), /* groupBy slots */ - makeEM(accumulatorSlot, - makeFunction("addToArrayCapped"_sd, - makeVariable(foreignRecordSlot), - makeConstant(TypeTags::NumberInt32, sizeCap))), + makeSlotExprPairVec(accumulatorSlot, + makeFunction("addToArrayCapped"_sd, + makeVariable(foreignRecordSlot), + makeConstant(TypeTags::NumberInt32, sizeCap))), {} /* collatorSlot, no collation here because we want to return all matches "as is" */, allowDiskUse, + makeSlotExprPairVec(spillSlot, + makeFunction("aggConcatArraysCapped", + makeVariable(spillSlot), + makeConstant(TypeTags::NumberInt32, sizeCap))), nodeId); // 'accumulatorSlot' is either Nothing or contains an array of size two, where the front element |