diff options
Diffstat (limited to 'src/mongo/db')
23 files changed, 849 insertions, 396 deletions
diff --git a/src/mongo/db/exec/sbe/abt/abt_lower.cpp b/src/mongo/db/exec/sbe/abt/abt_lower.cpp index 7da6a8e2cef..522ed220e1a 100644 --- a/src/mongo/db/exec/sbe/abt/abt_lower.cpp +++ b/src/mongo/db/exec/sbe/abt/abt_lower.cpp @@ -586,14 +586,15 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const GroupByNode& n, auto& names = binderAgg->names(); auto& exprs = refsAgg->nodes(); - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs; + sbe::SlotExprPairVector aggs; + aggs.reserve(exprs.size()); for (size_t idx = 0; idx < exprs.size(); ++idx) { auto expr = SBEExpressionLowering{_env, _slotMap}.optimize(exprs[idx]); auto slot = _slotIdGenerator.generate(); _slotMap.emplace(names[idx], slot); - aggs.emplace(slot, std::move(expr)); + aggs.push_back({slot, std::move(expr)}); } // TODO: use collator slot. @@ -609,6 +610,11 @@ std::unique_ptr<sbe::PlanStage> SBENodeLowering::walk(const GroupByNode& n, true /*optimizedClose*/, collatorSlot, false /*allowDiskUse*/, + // Since we are always disallowing disk use for this stage, + // we need not provide merging expressions. Once spilling + // is permitted here, we will need to generate merging + // expressions during lowering. + sbe::makeSlotExprPairVec() /*mergingExprs*/, planNodeId); } diff --git a/src/mongo/db/exec/sbe/expressions/expression.cpp b/src/mongo/db/exec/sbe/expressions/expression.cpp index 96d5d175fa9..f0d10cf2cd2 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.cpp +++ b/src/mongo/db/exec/sbe/expressions/expression.cpp @@ -492,6 +492,7 @@ static stdx::unordered_map<std::string, BuiltinFn> kBuiltinFunctions = { BuiltinFn{[](size_t n) { return n >= 1; }, vm::Builtin::collSetIntersection, false}}, {"collSetDifference", BuiltinFn{[](size_t n) { return n == 3; }, vm::Builtin::collSetDifference, false}}, + {"aggSetUnion", BuiltinFn{[](size_t n) { return n == 1; }, vm::Builtin::aggSetUnion, true}}, {"aggSetUnionCapped", BuiltinFn{[](size_t n) { return n == 2; }, vm::Builtin::aggSetUnionCapped, true}}, {"aggCollSetUnionCapped", diff --git a/src/mongo/db/exec/sbe/expressions/expression.h b/src/mongo/db/exec/sbe/expressions/expression.h index 2b9032f257b..d5a2c0fdf0e 100644 --- a/src/mongo/db/exec/sbe/expressions/expression.h +++ b/src/mongo/db/exec/sbe/expressions/expression.h @@ -348,6 +348,8 @@ private: std::string toString() const; }; +using SlotExprPairVector = std::vector<std::pair<value::SlotId, std::unique_ptr<EExpression>>>; + template <typename T, typename... Args> inline std::unique_ptr<EExpression> makeE(Args&&... args) { return std::make_unique<T>(std::forward<Args>(args)...); @@ -364,20 +366,30 @@ inline auto makeEs(Ts&&... pack) { namespace detail { // base case -inline void makeEM_unwind(value::SlotMap<std::unique_ptr<EExpression>>& result, - value::SlotId slot, - std::unique_ptr<EExpression> expr) { - result.emplace(slot, std::move(expr)); +template <typename R> +inline void makeSlotExprPairHelper(R& result, + value::SlotId slot, + std::unique_ptr<EExpression> expr) { + if constexpr (std::is_same_v<R, value::SlotMap<std::unique_ptr<EExpression>>>) { + result.emplace(slot, std::move(expr)); + } else { + result.push_back({slot, std::move(expr)}); + } } // recursive case -template <typename... Ts> -inline void makeEM_unwind(value::SlotMap<std::unique_ptr<EExpression>>& result, - value::SlotId slot, - std::unique_ptr<EExpression> expr, - Ts&&... rest) { - result.emplace(slot, std::move(expr)); - makeEM_unwind(result, std::forward<Ts>(rest)...); +template <typename R, typename... Ts> +inline void makeSlotExprPairHelper(R& result, + value::SlotId slot, + std::unique_ptr<EExpression> expr, + Ts&&... rest) { + if constexpr (std::is_same_v<R, value::SlotMap<std::unique_ptr<EExpression>>>) { + result.emplace(slot, std::move(expr)); + } else { + static_assert(std::is_same_v<R, SlotExprPairVector>); + result.push_back({slot, std::move(expr)}); + } + makeSlotExprPairHelper(result, std::forward<Ts>(rest)...); } } // namespace detail @@ -386,7 +398,7 @@ auto makeEM(Ts&&... pack) { value::SlotMap<std::unique_ptr<EExpression>> result; if constexpr (sizeof...(pack) > 0) { result.reserve(sizeof...(Ts) / 2); - detail::makeEM_unwind(result, std::forward<Ts>(pack)...); + detail::makeSlotExprPairHelper(result, std::forward<Ts>(pack)...); } return result; } @@ -399,6 +411,16 @@ auto makeSV(Args&&... args) { return v; } +template <typename... Ts> +auto makeSlotExprPairVec(Ts&&... pack) { + SlotExprPairVector v; + if constexpr (sizeof...(pack) > 0) { + v.reserve(sizeof...(Ts) / 2); + detail::makeSlotExprPairHelper(v, std::forward<Ts>(pack)...); + } + return v; +} + /** * This is a constant expression. It assumes the ownership of the input constant. */ diff --git a/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp b/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp index 30eb61a7b2d..279f7a287b4 100644 --- a/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp +++ b/src/mongo/db/exec/sbe/expressions/sbe_set_expressions_test.cpp @@ -28,6 +28,7 @@ */ #include "mongo/db/exec/sbe/expression_test_base.h" +#include "mongo/db/query/sbe_stage_builder_helpers.h" namespace mongo::sbe { @@ -93,6 +94,35 @@ TEST_F(SBEBuiltinSetOpTest, ReturnsNothingSetUnion) { runAndAssertNothing(compiledExpr.get()); } +TEST_F(SBEBuiltinSetOpTest, AggSetUnion) { + value::OwnedValueAccessor aggAccessor, inputAccessor; + auto inputSlot = bindAccessor(&inputAccessor); + auto setUnionExpr = + stage_builder::makeFunction("aggSetUnion", stage_builder::makeVariable(inputSlot)); + auto compiledExpr = compileAggExpression(*setUnionExpr, &aggAccessor); + + auto [arrTag1, arrVal1] = makeArray(BSON_ARRAY(1 << 2)); + inputAccessor.reset(arrTag1, arrVal1); + auto [resTag1, resVal1] = makeArraySet(BSON_ARRAY(1 << 2)); + runAndAssertExpression(compiledExpr.get(), {resTag1, resVal1}); + aggAccessor.reset(resTag1, resVal1); + + auto [arrTag2, arrVal2] = makeArraySet(BSON_ARRAY(1 << 3 << 2 << 6)); + inputAccessor.reset(arrTag2, arrVal2); + auto [resTag2, resVal2] = makeArraySet(BSON_ARRAY(1 << 2 << 3 << 6)); + runAndAssertExpression(compiledExpr.get(), {resTag2, resVal2}); + aggAccessor.reset(resTag2, resVal2); + + auto [arrTag3, arrVal3] = makeArray(BSONArray{}); + inputAccessor.reset(arrTag3, arrVal3); + auto [resTag3, resVal3] = makeArraySet(BSON_ARRAY(1 << 2 << 3 << 6)); + runAndAssertExpression(compiledExpr.get(), {resTag3, resVal3}); + aggAccessor.reset(resTag3, resVal3); + + inputAccessor.reset(value::TypeTags::Nothing, 0); + runAndAssertNothing(compiledExpr.get()); +} + TEST_F(SBEBuiltinSetOpTest, ComputesSetIntersection) { value::OwnedValueAccessor slotAccessor1, slotAccessor2; auto arrSlot1 = bindAccessor(&slotAccessor1); diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp index 87fc8dbd1f2..89244fd4589 100644 --- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp @@ -69,18 +69,23 @@ void HashAggStageTest::performHashAggWithSpillChecking( auto makeStageFn = [this, collatorSlot, shouldUseCollator, shouldSpill]( value::SlotId scanSlot, std::unique_ptr<PlanStage> scanStage) { auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), makeSV(), true, boost::optional<value::SlotId>{shouldUseCollator, collatorSlot}, shouldSpill, + makeSlotExprPairVec( + spillSlot, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); @@ -166,20 +171,21 @@ TEST_F(HashAggStageTest, HashAggMinMaxTest) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(), - makeEM(minSlot, - stage_builder::makeFunction("min", makeE<EVariable>(scanSlot)), - maxSlot, - stage_builder::makeFunction("max", makeE<EVariable>(scanSlot)), - collMinSlot, - stage_builder::makeFunction( - "collMin", collExpr->clone(), makeE<EVariable>(scanSlot)), - collMaxSlot, - stage_builder::makeFunction( - "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))), + makeSlotExprPairVec(minSlot, + stage_builder::makeFunction("min", makeE<EVariable>(scanSlot)), + maxSlot, + stage_builder::makeFunction("max", makeE<EVariable>(scanSlot)), + collMinSlot, + stage_builder::makeFunction( + "collMin", collExpr->clone(), makeE<EVariable>(scanSlot)), + collMaxSlot, + stage_builder::makeFunction( + "collMax", collExpr->clone(), makeE<EVariable>(scanSlot))), makeSV(), true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec() /* mergingExprs */, kEmptyPlanNodeId); auto outSlot = generateSlotId(); @@ -230,13 +236,15 @@ TEST_F(HashAggStageTest, HashAggAddToSetTest) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(), - makeEM(hashAggSlot, - stage_builder::makeFunction( - "collAddToSet", std::move(collExpr), makeE<EVariable>(scanSlot))), + makeSlotExprPairVec(hashAggSlot, + stage_builder::makeFunction("collAddToSet", + std::move(collExpr), + makeE<EVariable>(scanSlot))), makeSV(), true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec() /* mergingExprs */, kEmptyPlanNodeId); return std::make_pair(hashAggSlot, std::move(hashAggStage)); @@ -327,14 +335,16 @@ TEST_F(HashAggStageTest, HashAggSeekKeysTest) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), makeSV(seekSlot), true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec() /* mergingExprs */, kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); @@ -387,17 +397,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -420,6 +434,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->numSpills); ASSERT_EQ(0, stats->spilledRecords); stage->close(); @@ -428,7 +443,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to // 64B so that exactly one row fits in memory. - const int expectedRowsToFitInMemory = 1; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); @@ -446,17 +460,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -479,7 +497,14 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + // Memory usage is estimated only every two rows at the most frequent. Also, we only start + // spilling after estimating that the memory budget is exceeded. These two factors result in + // fewer expected spills than there are input records, even though only one record fits in + // memory at a time. + ASSERT_EQ(stats->numSpills, 3); + // The input has one run of two consecutive values, so we expect to spill as many records as + // there are input values minus one. + ASSERT_EQ(stats->spilledRecords, 8); stage->close(); } @@ -513,17 +538,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -546,6 +575,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { // Check that it did not spill. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->numSpills); ASSERT_EQ(0, stats->spilledRecords); stage->close(); @@ -554,7 +584,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // We estimate the size of result row like {double, int64} at 50B. Set the memory threshold to // 64B so that exactly one row fits in memory. - const int expectedRowsToFitInMemory = 1; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); @@ -572,17 +601,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -605,7 +638,14 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + // Memory usage is estimated only every two rows at the most frequent. Also, we only start + // spilling after estimating that the memory budget is exceeded. These two factors result in + // fewer expected spills than there are input records, even though only one record fits in + // memory at a time. + ASSERT_EQ(stats->numSpills, 3); + // The input has one run of two consecutive values, so we expect to spill as many records as + // there are input values minus one. + ASSERT_EQ(stats->spilledRecords, 8); stage->close(); } @@ -627,17 +667,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) { // Build a HashAggStage, with an empty group by slot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -658,6 +702,7 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->numSpills); ASSERT_EQ(0, stats->spilledRecords); stage->close(); @@ -666,7 +711,6 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillWithNoGroupByDouble) { TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // We estimate the size of result row like {double, int64} at 59B. Set the memory threshold to // 128B so that two rows fit in memory. - const int expectedRowsToFitInMemory = 2; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128); @@ -685,19 +729,27 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); auto sumsSlot = generateSlotId(); + auto spillSlot1 = generateSlotId(); + auto spillSlot2 = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), - sumsSlot, - stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), + sumsSlot, + stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), makeSV(), // Seek slot true, boost::none, true /* allowDiskUse */, + makeSlotExprPairVec( + spillSlot1, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot1)), + spillSlot2, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot2))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -727,7 +779,10 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + ASSERT_EQ(stats->numSpills, 3); + // The input has one run of two consecutive values, so we expect to spill as many records as + // there are input values minus one. + ASSERT_EQ(stats->spilledRecords, 8); stage->close(); } @@ -752,19 +807,27 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); auto sumsSlot = generateSlotId(); + auto spillSlot1 = generateSlotId(); + auto spillSlot2 = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), - sumsSlot, - stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1))), + sumsSlot, + stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), makeSV(), // Seek slot true, boost::none, true, // allowDiskUse=true + makeSlotExprPairVec( + spillSlot1, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot1)), + spillSlot2, + stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot2))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -794,7 +857,9 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { // Check that the spilling behavior matches the expected. auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); ASSERT_TRUE(stats->usedDisk); - ASSERT_EQ(results.size(), stats->spilledRecords); + // We expect each incoming value to result in a spill of a single record. + ASSERT_EQ(stats->numSpills, 9); + ASSERT_EQ(stats->spilledRecords, 9); stage->close(); } @@ -831,14 +896,18 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) { // Build a HashAggStage, group by the scanSlot and compute a sum for each group. auto sumsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(sumsSlot, stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), + makeSlotExprPairVec(sumsSlot, + stage_builder::makeFunction("sum", makeE<EVariable>(scanSlot))), makeSV(), // Seek slot true, boost::none, true, // allowDiskUse=true + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. @@ -872,17 +941,21 @@ TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) { // Build a HashAggStage, group by the scanSlot and compute a simple count. auto countsSlot = generateSlotId(); + auto spillSlot = generateSlotId(); auto stage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), makeSV(), // Seek slot true, boost::none, true, // allowDiskUse=true + makeSlotExprPairVec( + spillSlot, stage_builder::makeFunction("sum", stage_builder::makeVariable(spillSlot))), kEmptyPlanNodeId); // Prepare the tree and get the 'SlotAccessor' for the output slot. diff --git a/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp b/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp index 191f0ffe562..25bb0e74cb2 100644 --- a/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_plan_size_test.cpp @@ -139,11 +139,12 @@ TEST_F(PlanSizeTest, Filter) { TEST_F(PlanSizeTest, HashAgg) { auto stage = makeS<HashAggStage>(mockS(), mockSV(), - makeEM(generateSlotId(), mockE()), + makeSlotExprPairVec(generateSlotId(), mockE()), makeSV(), true, generateSlotId(), false, + makeSlotExprPairVec(), kEmptyPlanNodeId); assertPlanSize(*stage); } diff --git a/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp b/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp index 54fadbe1f15..57d3ff4e05f 100644 --- a/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_trial_run_tracker_test.cpp @@ -141,14 +141,16 @@ TEST_F(TrialRunTrackerTest, TrialEndsDuringOpenPhaseOfBlockingStage) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); auto tracker = std::make_unique<TrialRunTracker>(numResultsLimit, size_t{0}); @@ -210,14 +212,16 @@ TEST_F(TrialRunTrackerTest, OnlyDeepestNestedBlockingStageHasTrialRunTracker) { auto hashAggStage = makeS<HashAggStage>( std::move(unionStage), makeSV(unionSlot), - makeEM(countsSlot, - stage_builder::makeFunction( - "sum", - makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); hashAggStage->prepare(*ctx); @@ -277,14 +281,16 @@ TEST_F(TrialRunTrackerTest, SiblingBlockingStagesBothGetTrialRunTracker) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); @@ -407,14 +413,16 @@ TEST_F(TrialRunTrackerTest, DisablingTrackingForAChildStagePreventsEarlyExit) { auto hashAggStage = makeS<HashAggStage>( std::move(scanStage), makeSV(scanSlot), - makeEM(countsSlot, - stage_builder::makeFunction("sum", - makeE<EConstant>(value::TypeTags::NumberInt64, - value::bitcastFrom<int64_t>(1)))), - makeSV(), // Seek slot + makeSlotExprPairVec( + countsSlot, + stage_builder::makeFunction("sum", + makeE<EConstant>(value::TypeTags::NumberInt64, + value::bitcastFrom<int64_t>(1)))), + makeSV(), /* Seek slot */ true, boost::none, false /* allowDiskUse */, + makeSlotExprPairVec(), /* mergingExprs */ kEmptyPlanNodeId); return std::make_pair(countsSlot, std::move(hashAggStage)); diff --git a/src/mongo/db/exec/sbe/size_estimator.h b/src/mongo/db/exec/sbe/size_estimator.h index fb6684eea52..bbb92328331 100644 --- a/src/mongo/db/exec/sbe/size_estimator.h +++ b/src/mongo/db/exec/sbe/size_estimator.h @@ -92,6 +92,11 @@ inline size_t estimate(const S& stats) { return stats.estimateObjectSizeInBytes() - sizeof(S); } +template <typename A, typename B> +inline size_t estimate(const std::pair<A, B>& pair) { + return estimate(pair.first) + estimate(pair.second); +} + // Calculate the size of the inlined vector's elements. template <typename T, size_t N, typename A> size_t estimate(const absl::InlinedVector<T, N, A>& vector) { diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index 1dcbc500ec8..f5e67981bcc 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -42,31 +42,53 @@ namespace mongo { namespace sbe { HashAggStage::HashAggStage(std::unique_ptr<PlanStage> input, value::SlotVector gbs, - value::SlotMap<std::unique_ptr<EExpression>> aggs, + SlotExprPairVector aggs, value::SlotVector seekKeysSlots, bool optimizedClose, boost::optional<value::SlotId> collatorSlot, bool allowDiskUse, - PlanNodeId planNodeId) + SlotExprPairVector mergingExprs, + PlanNodeId planNodeId, + bool forceIncreasedSpilling) : PlanStage("group"_sd, planNodeId), _gbs(std::move(gbs)), _aggs(std::move(aggs)), _collatorSlot(collatorSlot), _allowDiskUse(allowDiskUse), _seekKeysSlots(std::move(seekKeysSlots)), - _optimizedClose(optimizedClose) { + _optimizedClose(optimizedClose), + _mergingExprs(std::move(mergingExprs)), + _forceIncreasedSpilling(forceIncreasedSpilling) { _children.emplace_back(std::move(input)); invariant(_seekKeysSlots.empty() || _seekKeysSlots.size() == _gbs.size()); tassert(5843100, "HashAgg stage was given optimizedClose=false and seek keys", _seekKeysSlots.empty() || _optimizedClose); + + if (_allowDiskUse) { + tassert(7039549, + "disk use enabled for HashAggStage but incorrect number of merging expresssions", + _aggs.size() == _mergingExprs.size()); + } + + if (_forceIncreasedSpilling) { + tassert(7039554, "'forceIncreasedSpilling' set but disk use not allowed", _allowDiskUse); + } } std::unique_ptr<PlanStage> HashAggStage::clone() const { - value::SlotMap<std::unique_ptr<EExpression>> aggs; + SlotExprPairVector aggs; + aggs.reserve(_aggs.size()); for (auto& [k, v] : _aggs) { - aggs.emplace(k, v->clone()); + aggs.push_back({k, v->clone()}); } + + SlotExprPairVector mergingExprsClone; + mergingExprsClone.reserve(_mergingExprs.size()); + for (auto&& [k, v] : _mergingExprs) { + mergingExprsClone.push_back({k, v->clone()}); + } + return std::make_unique<HashAggStage>(_children[0]->clone(), _gbs, std::move(aggs), @@ -74,7 +96,9 @@ std::unique_ptr<PlanStage> HashAggStage::clone() const { _optimizedClose, _collatorSlot, _allowDiskUse, - _commonStats.nodeId); + std::move(mergingExprsClone), + _commonStats.nodeId, + _forceIncreasedSpilling); } void HashAggStage::doSaveState(bool relinquishCursor) { @@ -119,34 +143,36 @@ void HashAggStage::prepare(CompileCtx& ctx) { } value::SlotSet dupCheck; + auto throwIfDupSlot = [&dupCheck](value::SlotId slot) { + auto [_, inserted] = dupCheck.emplace(slot); + tassert(7039551, "duplicate slot id", inserted); + }; + size_t counter = 0; // Process group by columns. for (auto& slot : _gbs) { - auto [it, inserted] = dupCheck.emplace(slot); - uassert(4822827, str::stream() << "duplicate field: " << slot, inserted); + throwIfDupSlot(slot); _inKeyAccessors.emplace_back(_children[0]->getAccessor(ctx, slot)); - // Construct accessors for the key to be processed from either the '_ht' or the - // '_recordStore'. Before the memory limit is reached the '_outHashKeyAccessors' will carry - // the group-by keys, otherwise the '_outRecordStoreKeyAccessors' will carry the group-by - // keys. + // Construct accessors for obtaining the key values from either the hash table '_ht' or the + // '_recordStore'. _outHashKeyAccessors.emplace_back(std::make_unique<HashKeyAccessor>(_htIt, counter)); _outRecordStoreKeyAccessors.emplace_back( - std::make_unique<value::MaterializedSingleRowAccessor>(_aggKeyRecordStore, counter)); + std::make_unique<value::MaterializedSingleRowAccessor>(_outKeyRowRecordStore, counter)); - counter++; - - // A SwitchAccessor is used to point the '_outKeyAccessors' to the key coming from the '_ht' - // or the '_recordStore' when draining the HashAgg stage in getNext. The group-by key will - // either be in the '_ht' or the '_recordStore' if the key lives in memory, or if the key - // has been spilled to disk, respectively. The SwitchAccessor allows toggling between the - // two so the parent stage can read it through the '_outAccessors'. + // A 'SwitchAccessor' is used to point the '_outKeyAccessors' to the key coming from the + // '_ht' or the '_recordStore' when draining the HashAgg stage in getNext(). If no spilling + // occurred, the keys will be obtained from the hash table. If spilling kicked in, then all + // of the data is written out to the record store, so the 'SwitchAccessor' is reconfigured + // to obtain all of the keys from the spill table. _outKeyAccessors.emplace_back( std::make_unique<value::SwitchAccessor>(std::vector<value::SlotAccessor*>{ _outHashKeyAccessors.back().get(), _outRecordStoreKeyAccessors.back().get()})); _outAccessors[slot] = _outKeyAccessors.back().get(); + + ++counter; } // Process seek keys (if any). The keys must come from outside of the subtree (by definition) so @@ -157,26 +183,18 @@ void HashAggStage::prepare(CompileCtx& ctx) { counter = 0; for (auto& [slot, expr] : _aggs) { - auto [it, inserted] = dupCheck.emplace(slot); - // Some compilers do not allow to capture local bindings by lambda functions (the one - // is used implicitly in uassert below), so we need a local variable to construct an - // error message. - const auto slotId = slot; - uassert(4822828, str::stream() << "duplicate field: " << slotId, inserted); - - // Construct accessors for the agg state to be processed from either the '_ht' or the - // '_recordStore' by the SwitchAccessor owned by '_outAggAccessors' below. + throwIfDupSlot(slot); + + // Just like with the output accessors for the keys, we construct output accessors for the + // aggregate values that read from either the hash table '_ht' or the '_recordStore'. _outRecordStoreAggAccessors.emplace_back( - std::make_unique<value::MaterializedSingleRowAccessor>(_aggValueRecordStore, counter)); + std::make_unique<value::MaterializedSingleRowAccessor>(_outAggRowRecordStore, counter)); _outHashAggAccessors.emplace_back(std::make_unique<HashAggAccessor>(_htIt, counter)); - counter++; - - // A SwitchAccessor is used to toggle the '_outAggAccessors' between the '_ht' and the - // '_recordStore' when updating the agg state via the bytecode. By compiling the agg - // EExpressions with a SwitchAccessor we can load the agg value into the memory of - // '_aggValueRecordStore' if the value comes from the '_recordStore' or we can use the - // agg value referenced through '_htIt' and run the bytecode to mutate the value through the - // SwitchAccessor. + + // A 'SwitchAccessor' is used to toggle the '_outAggAccessors' between the '_ht' and the + // '_recordStore'. Just like the key values, the aggregate values are always obtained from + // the hash table if no spilling occurred and are always obtained from the record store if + // spilling occurred. _outAggAccessors.emplace_back( std::make_unique<value::SwitchAccessor>(std::vector<value::SlotAccessor*>{ _outHashAggAccessors.back().get(), _outRecordStoreAggAccessors.back().get()})); @@ -186,10 +204,32 @@ void HashAggStage::prepare(CompileCtx& ctx) { ctx.root = this; ctx.aggExpression = true; ctx.accumulator = _outAggAccessors.back().get(); - _aggCodes.emplace_back(expr->compile(ctx)); ctx.aggExpression = false; + + ++counter; + } + + // If disk use is allowed, then we need to compile the merging expressions as well. + if (_allowDiskUse) { + counter = 0; + for (auto&& [spillSlot, mergingExpr] : _mergingExprs) { + throwIfDupSlot(spillSlot); + + _spilledAggsAccessors.push_back( + std::make_unique<value::MaterializedSingleRowAccessor>(_spilledAggRow, counter)); + _spilledAggsAccessorMap[spillSlot] = _spilledAggsAccessors.back().get(); + + ctx.root = this; + ctx.aggExpression = true; + ctx.accumulator = _outAggAccessors[counter].get(); + _mergingExprCodes.emplace_back(mergingExpr->compile(ctx)); + ctx.aggExpression = false; + + ++counter; + } } + _compiled = true; } @@ -199,6 +239,15 @@ value::SlotAccessor* HashAggStage::getAccessor(CompileCtx& ctx, value::SlotId sl return it->second; } } else { + // The slots into which we read spilled partial aggregates, accessible via + // '_spilledAggsAccessors', should only be visible to this stage. They are used internally + // when merging spilled partial aggregates and should never be read by ancestor stages. + // Therefore, they are only made visible when this stage is in the process of compiling + // itself. + if (auto it = _spilledAggsAccessorMap.find(slot); it != _spilledAggsAccessorMap.end()) { + return it->second; + } + return _children[0]->getAccessor(ctx, slot); } @@ -224,89 +273,82 @@ void HashAggStage::spillRowToDisk(const value::MaterializedRow& key, const value::MaterializedRow& val) { KeyString::Builder kb{KeyString::Version::kLatestVersion}; key.serializeIntoKeyString(kb); + // Add a unique integer to the end of the key, since record ids must be unique. We want equal + // keys to be adjacent in the 'RecordStore' so that we can merge the partial aggregates with a + // single pass. + kb.appendNumberLong(_ridCounter++); auto typeBits = kb.getTypeBits(); - auto rid = RecordId(kb.getBuffer(), kb.getSize()); - boost::optional<value::MaterializedRow> valFromRs = - readFromRecordStore(_opCtx, _recordStore->rs(), rid); - tassert(6031100, "Spilling a row doesn't support updating it in the store.", !valFromRs); - spillValueToDisk(rid, val, typeBits, false /*update*/); + upsertToRecordStore(_opCtx, _recordStore->rs(), rid, val, typeBits, false /*update*/); + _specificStats.spilledRecords++; } -void HashAggStage::spillValueToDisk(const RecordId& key, - const value::MaterializedRow& val, - const KeyString::TypeBits& typeBits, - bool update) { - auto nBytes = upsertToRecordStore(_opCtx, _recordStore->rs(), key, val, typeBits, update); - if (!update) { - _specificStats.spilledRecords++; +void HashAggStage::spill(MemoryCheckData& mcd) { + uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + "Exceeded memory limit for $group, but didn't allow external spilling;" + " pass allowDiskUse:true to opt in", + _allowDiskUse); + + // Since we flush the entire hash table to disk, we also clear any state related to estimating + // memory consumption. + mcd.reset(); + + if (!_recordStore) { + makeTemporaryRecordStore(); } - _specificStats.lastSpilledRecordSize = nBytes; + + for (auto&& it : *_ht) { + spillRowToDisk(it.first, it.second); + } + _ht->clear(); + + ++_specificStats.numSpills; } // Checks memory usage. Ideally, we'd want to know the exact size of already accumulated data, but // we cannot, so we estimate it based on the last updated/inserted row, if we have one, or the first // row in the '_ht' table. If the estimated memory usage exceeds the allowed, this method initiates -// spilling (if haven't been done yet) and evicts some records from the '_ht' table into the temp -// store to keep the memory usage under the limit. +// spilling. void HashAggStage::checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd) { - // The '_ht' table might become empty in the degenerate case when all rows had to be evicted to - // meet the memory constraint during a previous check -- we don't need to keep checking memory - // usage in this case because the table will never get new rows. - if (_ht->empty()) { - return; - } + invariant(!_ht->empty()); // If the group-by key is empty we will only ever aggregate into a single row so no sense in - // spilling since we will just be moving a single row back and forth from disk to main memory. + // spilling. if (_inKeyAccessors.size() == 0) { return; } mcd.memoryCheckpointCounter++; - if (mcd.memoryCheckpointCounter >= mcd.nextMemoryCheckpoint) { - if (_htIt == _ht->end()) { - _htIt = _ht->begin(); - } - const long estimatedRowSize = - _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter(); - long long estimatedTotalSize = _ht->size() * estimatedRowSize; + if (mcd.memoryCheckpointCounter < mcd.nextMemoryCheckpoint) { + // We haven't reached the next checkpoint at which we estimate memory usage and decide if we + // should spill. + return; + } + + const long estimatedRowSize = + _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter(); + long long estimatedTotalSize = _ht->size() * estimatedRowSize; + + if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) { + spill(mcd); + } else { + // Calculate the next memory checkpoint. We estimate it based on the prior growth of the + // '_ht' and the remaining available memory. If 'estimatedGainPerChildAdvance' suggests that + // the hash table is growing, then the checkpoint is estimated as some configurable + // percentage of the number of additional input rows that we would have to process to + // consume the remaining memory. On the other hand, a value of 'estimtedGainPerChildAdvance' + // close to zero indicates a stable hash stable size, in which case we can delay the next + // check progressively. const double estimatedGainPerChildAdvance = (static_cast<double>(estimatedTotalSize - mcd.lastEstimatedMemoryUsage) / mcd.memoryCheckpointCounter); - if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) { - uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, - "Exceeded memory limit for $group, but didn't allow external spilling." - " Pass allowDiskUse:true to opt in.", - _allowDiskUse); - if (!_recordStore) { - makeTemporaryRecordStore(); - } - - // Evict enough rows into the temporary store to drop below the memory constraint. - const long rowsToEvictCount = - 1 + (estimatedTotalSize - _approxMemoryUseInBytesBeforeSpill) / estimatedRowSize; - for (long i = 0; !_ht->empty() && i < rowsToEvictCount; i++) { - spillRowToDisk(_htIt->first, _htIt->second); - _ht->erase(_htIt); - _htIt = _ht->begin(); - } - estimatedTotalSize = _ht->size() * estimatedRowSize; - } - - // Calculate the next memory checkpoint. We estimate it based on the prior growth of the - // '_ht' and the remaining available memory. We have to keep doing this even after starting - // to spill because some accumulators can grow in size inside '_ht' (with no bounds). - // Value of 'estimatedGainPerChildAdvance' can be negative if the previous checkpoint - // evicted any records. And a value close to zero indicates a stable size of '_ht' so can - // delay the next check progressively. const long nextCheckpointCandidate = (estimatedGainPerChildAdvance > 0.1) ? mcd.checkpointMargin * (_approxMemoryUseInBytesBeforeSpill - estimatedTotalSize) / estimatedGainPerChildAdvance - : (estimatedGainPerChildAdvance < -0.1) ? mcd.atMostCheckFrequency - : mcd.nextMemoryCheckpoint * 2; + : mcd.nextMemoryCheckpoint * 2; + mcd.nextMemoryCheckpoint = std::min<long>(mcd.memoryCheckFrequency, std::max<long>(mcd.atMostCheckFrequency, nextCheckpointCandidate)); @@ -332,17 +374,28 @@ void HashAggStage::open(bool reOpen) { 5402503, "collatorSlot must be of collator type", tag == value::TypeTags::collator); auto collatorView = value::getCollatorView(collatorVal); const value::MaterializedRowHasher hasher(collatorView); - const value::MaterializedRowEq equator(collatorView); - _ht.emplace(0, hasher, equator); + _keyEq = value::MaterializedRowEq(collatorView); + _ht.emplace(0, hasher, _keyEq); } else { _ht.emplace(); } _seekKeys.resize(_seekKeysAccessors.size()); - // A default value for spilling a key to the record store. - value::MaterializedRow defaultVal{_outAggAccessors.size()}; - bool updateAggStateHt = false; + // Reset state since this stage may have been previously opened. + for (auto&& accessor : _outKeyAccessors) { + accessor->setIndex(0); + } + for (auto&& accessor : _outAggAccessors) { + accessor->setIndex(0); + } + _rsCursor.reset(); + _recordStore.reset(); + _outKeyRowRecordStore = {0}; + _outAggRowRecordStore = {0}; + _spilledAggRow = {0}; + _stashedNextRow = {0, 0}; + MemoryCheckData memoryCheckData; while (_children[0]->getNext() == PlanState::ADVANCED) { @@ -354,56 +407,34 @@ void HashAggStage::open(bool reOpen) { key.reset(idx++, false, tag, val); } - - if (_htIt = _ht->find(key); !_recordStore && _htIt == _ht->end()) { - // The memory limit hasn't been reached yet, insert a new key in '_ht' by copying - // the key. Note as a future optimization, we should avoid the lookup in the find() - // call and the emplace. + bool newKey = false; + _htIt = _ht->find(key); + if (_htIt == _ht->end()) { + // The key is not present in the hash table yet, so we insert it and initialize the + // corresponding accumulator. Note that as a future optimization, we could avoid + // doing a lookup both in the 'find()' call and in 'emplace()'. + newKey = true; key.makeOwned(); auto [it, _] = _ht->emplace(std::move(key), value::MaterializedRow{0}); - // Initialize accumulators. it->second.resize(_outAggAccessors.size()); _htIt = it; } - updateAggStateHt = _htIt != _ht->end(); - - if (updateAggStateHt) { - // Accumulate state in '_ht' by pointing the '_outAggAccessors' the - // '_outHashAggAccessors'. - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - _outAggAccessors[idx]->setIndex(0); - auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); - _outHashAggAccessors[idx]->reset(owned, tag, val); - } - } else { - // The memory limit has been reached and the key wasn't in the '_ht' so we need - // to spill it to the '_recordStore'. - KeyString::Builder kb{KeyString::Version::kLatestVersion}; - // 'key' is moved only when 'updateAggStateHt' ends up "true", so it's safe to - // ignore the warning. - key.serializeIntoKeyString(kb); // NOLINT(bugprone-use-after-move) - auto typeBits = kb.getTypeBits(); - - auto rid = RecordId(kb.getBuffer(), kb.getSize()); - - boost::optional<value::MaterializedRow> valFromRs = - readFromRecordStore(_opCtx, _recordStore->rs(), rid); - if (!valFromRs) { - _aggValueRecordStore = defaultVal; - } else { - _aggValueRecordStore = *valFromRs; - } - - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - _outAggAccessors[idx]->setIndex(1); - auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); - _aggValueRecordStore.reset(idx, owned, tag, val); - } - spillValueToDisk(rid, _aggValueRecordStore, typeBits, valFromRs ? true : false); + + // Accumulate state in '_ht'. + for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { + auto [owned, tag, val] = _bytecode.run(_aggCodes[idx].get()); + _outHashAggAccessors[idx]->reset(owned, tag, val); } - // Estimates how much memory is being used and might start spilling. - checkMemoryUsageAndSpillIfNecessary(memoryCheckData); + if (_forceIncreasedSpilling && !newKey) { + // If configured to spill more than usual, we spill after seeing the same key twice. + spill(memoryCheckData); + } else { + // Estimates how much memory is being used. If we estimate that the hash table + // exceeds the allotted memory budget, its contents are spilled to the + // '_recordStore' and '_ht' is cleared. + checkMemoryUsageAndSpillIfNecessary(memoryCheckData); + } if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) { // During trial runs, we want to limit the amount of work done by opening a blocking @@ -420,8 +451,33 @@ void HashAggStage::open(bool reOpen) { _children[0]->close(); _childOpened = false; } - } + // If we spilled at any point while consuming the input, then do one final spill to write + // any leftover contents of '_ht' to the record store. That way, when recovering the input + // from the record store and merging partial aggregates we don't have to worry about the + // possibility of some of the data being in the hash table and some being in the record + // store. + if (_recordStore) { + if (!_ht->empty()) { + spill(memoryCheckData); + } + + _specificStats.spilledDataStorageSize = _recordStore->rs()->storageSize(_opCtx); + + // Establish a cursor, positioned at the beginning of the record store. + _rsCursor = _recordStore->rs()->getCursor(_opCtx); + + // Callers will be obtaining the results from the spill table, so set the + // 'SwitchAccessors' so that they refer to the rows recovered from the record store + // under the hood. + for (auto&& accessor : _outKeyAccessors) { + accessor->setIndex(1); + } + for (auto&& accessor : _outAggAccessors) { + accessor->setIndex(1); + } + } + } if (!_seekKeysAccessors.empty()) { // Copy keys in order to do the lookup. @@ -433,20 +489,76 @@ void HashAggStage::open(bool reOpen) { } _htIt = _ht->end(); +} + +HashAggStage::SpilledRow HashAggStage::deserializeSpilledRecord(const Record& record, + BufBuilder& keyBuffer) { + // Read the values and type bits out of the value part of the record. + BufReader valReader(record.data.data(), record.data.size()); + auto val = value::MaterializedRow::deserializeForSorter(valReader, {}); + auto typeBits = KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader); + + keyBuffer.reset(); + auto key = value::MaterializedRow::deserializeFromKeyString( + decodeKeyString(record.id, typeBits), &keyBuffer, _gbs.size() /*numPrefixValuesToRead*/); + return {std::move(key), std::move(val)}; +} + +PlanState HashAggStage::getNextSpilled() { + if (_stashedNextRow.first.isEmpty()) { + auto nextRecord = _rsCursor->next(); + if (!nextRecord) { + return trackPlanState(PlanState::IS_EOF); + } + + // We are just starting the process of merging the spilled file segments. + auto recoveredRow = deserializeSpilledRecord(*nextRecord, _outKeyRowRSBuffer); - // Set the SwitchAccessors to point to the '_ht' so we can drain it first before draining the - // '_recordStore' in getNext(). - for (size_t idx = 0; idx < _outAggAccessors.size(); ++idx) { - _outAggAccessors[idx]->setIndex(0); + _outKeyRowRecordStore = std::move(recoveredRow.first); + _outAggRowRecordStore = std::move(recoveredRow.second); + } else { + // We peeked at the next key last time around. + _outKeyRowRSBuffer = std::move(_stashedKeyBuffer); + _outKeyRowRecordStore = std::move(_stashedNextRow.first); + _outAggRowRecordStore = std::move(_stashedNextRow.second); + // Clear the stashed row. + _stashedNextRow = {0, 0}; + } + + // Find additional partial aggregates for the same key and merge them in order to compute the + // final output. + for (auto nextRecord = _rsCursor->next(); nextRecord; nextRecord = _rsCursor->next()) { + auto recoveredRow = deserializeSpilledRecord(*nextRecord, _stashedKeyBuffer); + if (!_keyEq(recoveredRow.first, _outKeyRowRecordStore)) { + // The newly recovered spilled row belongs to a new key, so we're done merging partial + // aggregates for the old key. Save the new row for later and return advanced. + _stashedNextRow = std::move(recoveredRow); + return trackPlanState(PlanState::ADVANCED); + } + + // Merge in the new partial aggregate values. + _spilledAggRow = std::move(recoveredRow.second); + for (size_t idx = 0; idx < _mergingExprCodes.size(); ++idx) { + auto [owned, tag, val] = _bytecode.run(_mergingExprCodes[idx].get()); + _outRecordStoreAggAccessors[idx]->reset(owned, tag, val); + } } - _drainingRecordStore = false; + + return trackPlanState(PlanState::ADVANCED); } PlanState HashAggStage::getNext() { auto optTimer(getOptTimer(_opCtx)); - if (_htIt == _ht->end() && !_drainingRecordStore) { - // First invocation of getNext() after open() when not draining the '_recordStore'. + // If we've spilled, then we need to produce the output by merging the spilled segments from the + // spill file. + if (_recordStore) { + return getNextSpilled(); + } + + // We didn't spill. Obtain the next output row from the hash table. + if (_htIt == _ht->end()) { + // First invocation of getNext() after open(). if (!_seekKeysAccessors.empty()) { _htIt = _ht->find(_seekKeys); } else { @@ -455,53 +567,13 @@ PlanState HashAggStage::getNext() { } else if (!_seekKeysAccessors.empty()) { // Subsequent invocation with seek keys. Return only 1 single row (if any). _htIt = _ht->end(); - } else if (!_drainingRecordStore) { - // Returning the results of the entire hash table first before draining the '_recordStore'. + } else { ++_htIt; } - if (_htIt == _ht->end() && !_recordStore) { - // The hash table has been drained and nothing was spilled to disk. + if (_htIt == _ht->end()) { + // The hash table has been drained (and we never spilled to disk) so we're done. return trackPlanState(PlanState::IS_EOF); - } else if (_htIt != _ht->end()) { - // Drain the '_ht' on the next 'getNext()' call. - return trackPlanState(PlanState::ADVANCED); - } else if (_seekKeysAccessors.empty()) { - // A record store was created to spill to disk. Drain it then clean it up. - if (!_rsCursor) { - _rsCursor = _recordStore->rs()->getCursor(_opCtx); - } - auto nextRecord = _rsCursor->next(); - if (nextRecord) { - // Point the out accessors to the recordStore accessors to allow parent stages to read - // the agg state from the '_recordStore'. - if (!_drainingRecordStore) { - for (size_t i = 0; i < _outKeyAccessors.size(); ++i) { - _outKeyAccessors[i]->setIndex(1); - } - for (size_t i = 0; i < _outAggAccessors.size(); ++i) { - _outAggAccessors[i]->setIndex(1); - } - } - _drainingRecordStore = true; - - // Read the agg state value from the '_recordStore' and Reconstruct the key from the - // typeBits stored along side of the value. - BufReader valReader(nextRecord->data.data(), nextRecord->data.size()); - auto val = value::MaterializedRow::deserializeForSorter(valReader, {}); - auto typeBits = - KeyString::TypeBits::fromBuffer(KeyString::Version::kLatestVersion, &valReader); - _aggValueRecordStore = val; - - _aggKeyRSBuffer.reset(); - _aggKeyRecordStore = value::MaterializedRow::deserializeFromKeyString( - decodeKeyString(nextRecord->id, typeBits), &_aggKeyRSBuffer); - return trackPlanState(PlanState::ADVANCED); - } else { - _rsCursor.reset(); - _recordStore.reset(); - return trackPlanState(PlanState::IS_EOF); - } } else { return trackPlanState(PlanState::ADVANCED); } @@ -521,11 +593,19 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co childrenBob.append(str::stream() << slot, printer.print(expr->debugPrint())); } } + + if (!_mergingExprs.empty()) { + BSONObjBuilder nestedBuilder{bob.subobjStart("mergingExprs")}; + for (auto&& [slot, expr] : _mergingExprs) { + nestedBuilder.append(str::stream() << slot, printer.print(expr->debugPrint())); + } + } + // Spilling stats. bob.appendBool("usedDisk", _specificStats.usedDisk); + bob.appendNumber("numSpills", _specificStats.numSpills); bob.appendNumber("spilledRecords", _specificStats.spilledRecords); - bob.appendNumber("spilledBytesApprox", - _specificStats.lastSpilledRecordSize * _specificStats.spilledRecords); + bob.appendNumber("spilledDataStorageSize", _specificStats.spilledDataStorageSize); ret->debugInfo = bob.obj(); } @@ -543,11 +623,12 @@ void HashAggStage::close() { trackClose(); _ht = boost::none; - if (_recordStore) { - // A record store was created to spill to disk. Clean it up. - _recordStore.reset(); - _drainingRecordStore = false; - } + _rsCursor.reset(); + _recordStore.reset(); + _outKeyRowRecordStore = {0}; + _outAggRowRecordStore = {0}; + _spilledAggRow = {0}; + _stashedNextRow = {0, 0}; if (_childOpened) { _children[0]->close(); @@ -570,7 +651,7 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { ret.emplace_back(DebugPrinter::Block("[`")); bool first = true; - value::orderedSlotMapTraverse(_aggs, [&](auto slot, auto&& expr) { + for (auto&& [slot, expr] : _aggs) { if (!first) { ret.emplace_back(DebugPrinter::Block("`,")); } @@ -579,7 +660,7 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { ret.emplace_back("="); DebugPrinter::addBlocks(ret, expr->debugPrint()); first = false; - }); + } ret.emplace_back("`]"); if (!_seekKeysSlots.empty()) { @@ -594,6 +675,28 @@ std::vector<DebugPrinter::Block> HashAggStage::debugPrint() const { ret.emplace_back("`]"); } + if (!_mergingExprs.empty()) { + ret.emplace_back("spillSlots[`"); + for (size_t idx = 0; idx < _mergingExprs.size(); ++idx) { + if (idx) { + ret.emplace_back("`,"); + } + + DebugPrinter::addIdentifier(ret, _mergingExprs[idx].first); + } + ret.emplace_back("`]"); + + ret.emplace_back("mergingExprs[`"); + for (size_t idx = 0; idx < _mergingExprs.size(); ++idx) { + if (idx) { + ret.emplace_back("`,"); + } + + DebugPrinter::addBlocks(ret, _mergingExprs[idx].second->debugPrint()); + } + ret.emplace_back("`]"); + } + if (!_optimizedClose) { ret.emplace_back("reopen"); } @@ -614,6 +717,7 @@ size_t HashAggStage::estimateCompileTimeSize() const { size += size_estimator::estimate(_gbs); size += size_estimator::estimate(_aggs); size += size_estimator::estimate(_seekKeysSlots); + size += size_estimator::estimate(_mergingExprs); return size; } diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h index 19fbca9d1c7..001b29be887 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.h +++ b/src/mongo/db/exec/sbe/stages/hash_agg.h @@ -29,8 +29,6 @@ #pragma once -#include <unordered_map> - #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/stages/stages.h" #include "mongo/db/exec/sbe/vm/vm.h" @@ -61,21 +59,38 @@ namespace sbe { * determining whether two group-by keys are equal. For instance, the plan may require us to do a * case-insensitive group on a string field. * + * The 'allowDiskUse' flag controls whether this stage can spill. If false and the memory budget is + * exhausted, this stage throws a query-fatal error with code + * 'QueryExceededMemoryLimitNoDiskUseAllowed'. If true, then spilling is possible and the caller + * must provide a vector of 'mergingExprs'. This is a vector of (slot, expression) pairs which is + * symmetrical with 'aggs'. The slots are only visible internally and are used to store partial + * aggregate values that have been recovered from the spill table. Each of the expressions is an agg + * function which merges the partial aggregate value from this slot into the final aggregate value. + * In the debug string output, the internal slots used to house the partial aggregates are printed + * as a list of "spillSlots" and the expressions are printed as a parallel list of "mergingExprs". + * + * If 'forcedIncreasedSpilling' is true, then this stage will spill frequently even if the memory + * limit is not reached. This is intended to be used in test contexts to exercise the otherwise + * infrequently used spilling logic. + * * Debug string representation: * - * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? reopen? - * collatorSlot? childStage + * group [<group by slots>] [slot_1 = expr_1, ..., slot_n = expr_n] [<seek slots>]? + * spillSlots[slot_1, ..., slot_n] mergingExprs[expr_1, ..., expr_n] reopen? collatorSlot? + * childStage */ class HashAggStage final : public PlanStage { public: HashAggStage(std::unique_ptr<PlanStage> input, value::SlotVector gbs, - value::SlotMap<std::unique_ptr<EExpression>> aggs, + SlotExprPairVector aggs, value::SlotVector seekKeysSlots, bool optimizedClose, boost::optional<value::SlotId> collatorSlot, bool allowDiskUse, - PlanNodeId planNodeId); + SlotExprPairVector mergingExprs, + PlanNodeId planNodeId, + bool forceIncreasedSpilling = false); std::unique_ptr<PlanStage> clone() const final; @@ -108,99 +123,138 @@ private: using HashKeyAccessor = value::MaterializedRowKeyAccessor<TableType::iterator>; using HashAggAccessor = value::MaterializedRowValueAccessor<TableType::iterator>; - void makeTemporaryRecordStore(); - - /** - * Spills a key and value pair to the '_recordStore' where the semantics are insert or update - * depending on the 'update' flag. When the 'update' flag is true this method already expects - * the 'key' to be inserted into the '_recordStore', otherwise the 'key' and 'val' pair are - * fresh. - * - * This method expects the key to be seralized into a KeyString::Value so that the key is - * memcmp-able and lookups can be done to update the 'val' in the '_recordStore'. Note that the - * 'typeBits' are needed to reconstruct the spilled 'key' when calling 'getNext' to deserialize - * the 'key' to a MaterializedRow. Since the '_recordStore' only stores the memcmp-able part of - * the KeyString we need to carry the 'typeBits' separately, and we do this by appending the - * 'typeBits' to the end of the serialized 'val' buffer and store them at the leaves of the - * backing B-tree of the '_recordStore'. used as the RecordId. - */ - void spillValueToDisk(const RecordId& key, - const value::MaterializedRow& val, - const KeyString::TypeBits& typeBits, - bool update); - void spillRowToDisk(const value::MaterializedRow& key, - const value::MaterializedRow& defaultVal); + using SpilledRow = std::pair<value::MaterializedRow, value::MaterializedRow>; /** * We check amount of used memory every T processed incoming records, where T is calculated * based on the estimated used memory and its recent growth. When the memory limit is exceeded, - * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' and might spill some of - * the already accumulated data into it. + * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' (if it hasn't already been + * created) and spill the contents of the hash table into this record store. */ struct MemoryCheckData { + MemoryCheckData() { + reset(); + } + + void reset() { + memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency); + nextMemoryCheckpoint = 0; + memoryCheckpointCounter = 0; + lastEstimatedMemoryUsage = 0; + } + const double checkpointMargin = internalQuerySBEAggMemoryUseCheckMargin.load(); - const long atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load(); - const long atLeastMemoryCheckFrequency = + const int64_t atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load(); + const int64_t atLeastMemoryCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load(); // The check frequency upper bound, which start at 'atMost' and exponentially backs off // to 'atLeast' as more data is accumulated. If 'atLeast' is less than 'atMost', the memory // checks will be done every 'atLeast' incoming records. - long memoryCheckFrequency = 1; + int64_t memoryCheckFrequency = 1; // The number of incoming records to process before the next memory checkpoint. - long nextMemoryCheckpoint = 0; + int64_t nextMemoryCheckpoint = 0; // The counter of the incoming records between memory checkpoints. - long memoryCheckpointCounter = 0; + int64_t memoryCheckpointCounter = 0; - long long lastEstimatedMemoryUsage = 0; - - MemoryCheckData() { - memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency); - } + int64_t lastEstimatedMemoryUsage = 0; }; + + /** + * Inserts a key and value pair to the '_recordStore'. They key is serialized to a + * 'KeyString::Value' which becomes the 'RecordId'. This makes the keys memcmp-able and ensures + * that the record store ends up sorted by the group-by keys. + * + * Note that the 'typeBits' are needed to reconstruct the spilled 'key' to a 'MaterializedRow', + * but are not necessary for comparison purposes. Therefore, we carry the type bits separately + * from the record id, instead appending them to the end of the serialized 'val' buffer. + */ + void spillRowToDisk(const value::MaterializedRow& key, const value::MaterializedRow& val); + void checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd); + void spill(MemoryCheckData& mcd); + + /** + * Given a 'record' from the record store, decodes it into a pair of materialized rows (one for + * the group-by keys and another for the agg values). + * + * The given 'keyBuffer' is cleared, and then used to hold data (e.g. long strings and other + * values that can't be inlined) obtained by decoding the 'RecordId' keystring to a + * 'MaterializedRow'. The values in the resulting 'MaterializedRow' may be pointers into + * 'keyBuffer', so it is important that 'keyBuffer' outlive the row. + */ + SpilledRow deserializeSpilledRecord(const Record& record, BufBuilder& keyBuffer); + + PlanState getNextSpilled(); + + void makeTemporaryRecordStore(); const value::SlotVector _gbs; - const value::SlotMap<std::unique_ptr<EExpression>> _aggs; + const SlotExprPairVector _aggs; const boost::optional<value::SlotId> _collatorSlot; const bool _allowDiskUse; const value::SlotVector _seekKeysSlots; // When this operator does not expect to be reopened (almost always) then it can close the child // early. const bool _optimizedClose{true}; + + // Expressions used to merge partial aggregates that have been spilled to disk and their + // corresponding input slots. For example, imagine that this list contains a pair (s12, + // sum(s12)). This means that the partial aggregate values will be read into slot s12 after + // being recovered from the spill table and can be merged using the 'sum()' agg function. + // + // When disk use is allowed, this vector must have the same length as '_aggs'. + const SlotExprPairVector _mergingExprs; + + // When true, we spill frequently without reaching the memory limit. This allows us to exercise + // the spilling logic more often in test contexts. + const bool _forceIncreasedSpilling; + value::SlotAccessorMap _outAccessors; + + // Accessors used to obtain the values of the group by slots when reading the input from the + // child. std::vector<value::SlotAccessor*> _inKeyAccessors; - // Accessors for the key stored in '_ht', a SwitchAccessor is used so we can produce the key - // from either the '_ht' or the '_recordStore'. + // This buffer stores values for '_outKeyRowRecordStore'; values in the '_outKeyRowRecordStore' + // can be pointers that point to data in this buffer. + BufBuilder _outKeyRowRSBuffer; + // Accessors for the key slots provided as output by this stage. The keys can either come from + // the hash table or recovered from a temporary record store. We use a 'SwitchAccessor' to + // switch between these two cases. std::vector<std::unique_ptr<HashKeyAccessor>> _outHashKeyAccessors; + // Row of key values to output used when recovering spilled data from the record store. + value::MaterializedRow _outKeyRowRecordStore{0}; + std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors; std::vector<std::unique_ptr<value::SwitchAccessor>> _outKeyAccessors; - // Accessor for the agg state value stored in the '_recordStore' when data is spilled to disk. - value::MaterializedRow _aggKeyRecordStore{0}; - value::MaterializedRow _aggValueRecordStore{0}; - std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreKeyAccessors; + // Accessors for the output aggregate results. The aggregates can either come from the hash + // table or can be computed after merging partial aggregates spilled to a record store. We use a + // 'SwitchAccessor' to switch between these two cases. + std::vector<std::unique_ptr<HashAggAccessor>> _outHashAggAccessors; + // Row of agg values to output used when recovering spilled data from the record store. + value::MaterializedRow _outAggRowRecordStore{0}; std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _outRecordStoreAggAccessors; - - // This buffer stores values for the spilled '_aggKeyRecordStore' that's loaded into memory from - // the '_recordStore'. Values in the '_aggKeyRecordStore' row are pointers that point to data in - // this buffer. - BufBuilder _aggKeyRSBuffer; + std::vector<std::unique_ptr<value::SwitchAccessor>> _outAggAccessors; std::vector<value::SlotAccessor*> _seekKeysAccessors; value::MaterializedRow _seekKeys; - // Accesors for the agg state in '_ht', a SwitchAccessor is used so we can produce the agg state - // from either the '_ht' or the '_recordStore' when draining the HashAgg stage. - std::vector<std::unique_ptr<value::SwitchAccessor>> _outAggAccessors; - std::vector<std::unique_ptr<HashAggAccessor>> _outHashAggAccessors; + // Bytecode which gets executed to aggregate incoming rows into the hash table. std::vector<std::unique_ptr<vm::CodeFragment>> _aggCodes; + // Bytecode for the merging expressions, executed if partial aggregates are spilled to a record + // store and need to be subsequently combined. + std::vector<std::unique_ptr<vm::CodeFragment>> _mergingExprCodes; // Only set if collator slot provided on construction. value::SlotAccessor* _collatorAccessor = nullptr; + // Function object which can be used to check whether two materialized rows of key values are + // equal. This comparison is collation-aware if the query has a non-simple collation. + value::MaterializedRowEq _keyEq; + boost::optional<TableType> _ht; TableType::iterator _htIt; @@ -212,10 +266,31 @@ private: // Memory tracking and spilling to disk. const long long _approxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); + + // A record store which is instantiated and written to in the case of spilling. std::unique_ptr<TemporaryRecordStore> _recordStore; - bool _drainingRecordStore{false}; std::unique_ptr<SeekableRecordCursor> _rsCursor; + // A monotically increasing counter used to ensure uniqueness of 'RecordId' values. When + // spilling, the key is encoding into the 'RecordId' of the '_recordStore'. Record ids must be + // unique by definition, but we might end up spilling multiple partial aggregates for the same + // key. We ensure uniqueness by appending a unique integer to the end of this key, which is + // simply ignored during deserialization. + int64_t _ridCounter = 0; + + // Partial aggregates that have been spilled are read into '_spilledAggRow' and read using + // '_spilledAggsAccessors' so that they can be merged to compute the final aggregate value. + value::MaterializedRow _spilledAggRow{0}; + std::vector<std::unique_ptr<value::MaterializedSingleRowAccessor>> _spilledAggsAccessors; + value::SlotAccessorMap _spilledAggsAccessorMap; + + // Buffer to hold data for the deserialized key values from '_stashedNextRow'. + BufBuilder _stashedKeyBuffer; + // Place to stash the next keys and values during the streaming phase. The record store cursor + // doesn't offer a "peek" API, so we need to hold onto the next row between getNext() calls when + // the key value advances. + SpilledRow _stashedNextRow; + HashAggStats _specificStats; // If provided, used during a trial run to accumulate certain execution stats. Once the trial diff --git a/src/mongo/db/exec/sbe/stages/plan_stats.h b/src/mongo/db/exec/sbe/stages/plan_stats.h index 263a4f86660..f487c7a45d2 100644 --- a/src/mongo/db/exec/sbe/stages/plan_stats.h +++ b/src/mongo/db/exec/sbe/stages/plan_stats.h @@ -278,8 +278,13 @@ struct HashAggStats : public SpecificStats { } bool usedDisk{false}; + // The number of times that the entire hash table was spilled. + long long numSpills{0}; + // The number of individual records spilled to disk. long long spilledRecords{0}; - long long lastSpilledRecordSize{0}; + // An estimate, in bytes, of the size of the final spill table after all spill events have taken + // place. + long long spilledDataStorageSize{0}; }; struct HashLookupStats : public SpecificStats { diff --git a/src/mongo/db/exec/sbe/util/spilling.cpp b/src/mongo/db/exec/sbe/util/spilling.cpp index c54f3bfe956..6e8c027d7fe 100644 --- a/src/mongo/db/exec/sbe/util/spilling.cpp +++ b/src/mongo/db/exec/sbe/util/spilling.cpp @@ -85,7 +85,6 @@ int upsertToRecordStore(OperationContext* opCtx, BufBuilder& buf, const KeyString::TypeBits& typeBits, // recover type of value. bool update) { - // Append the 'typeBits' to the end of the val's buffer so the 'key' can be reconstructed when // draining HashAgg. buf.appendBuf(typeBits.getBuffer(), typeBits.getSize()); diff --git a/src/mongo/db/exec/sbe/util/spilling.h b/src/mongo/db/exec/sbe/util/spilling.h index 95f73e2b02e..0a562d7e864 100644 --- a/src/mongo/db/exec/sbe/util/spilling.h +++ b/src/mongo/db/exec/sbe/util/spilling.h @@ -55,9 +55,12 @@ boost::optional<value::MaterializedRow> readFromRecordStore(OperationContext* op RecordStore* rs, const RecordId& rid); -/** Inserts or updates a key/value into 'rs'. The 'update' flag controls whether or not an update +/** + * Inserts or updates a key/value into 'rs'. The 'update' flag controls whether or not an update * will be performed. If a key/value pair is inserted into the 'rs' that already exists and * 'update' is false, this function will tassert. + * + * Returns the size of the new record in bytes, including the record id and value portions. */ int upsertToRecordStore(OperationContext* opCtx, RecordStore* rs, @@ -65,7 +68,6 @@ int upsertToRecordStore(OperationContext* opCtx, const value::MaterializedRow& val, const KeyString::TypeBits& typeBits, bool update); - int upsertToRecordStore(OperationContext* opCtx, RecordStore* rs, const RecordId& key, diff --git a/src/mongo/db/exec/sbe/values/slot.cpp b/src/mongo/db/exec/sbe/values/slot.cpp index 1e7a31b2033..d8b59c6c3db 100644 --- a/src/mongo/db/exec/sbe/values/slot.cpp +++ b/src/mongo/db/exec/sbe/values/slot.cpp @@ -564,8 +564,10 @@ void MaterializedRow::serializeIntoKeyString(KeyString::Builder& buf) const { } } -MaterializedRow MaterializedRow::deserializeFromKeyString(const KeyString::Value& keyString, - BufBuilder* valueBufferBuilder) { +MaterializedRow MaterializedRow::deserializeFromKeyString( + const KeyString::Value& keyString, + BufBuilder* valueBufferBuilder, + boost::optional<size_t> numPrefixValsToRead) { BufReader reader(keyString.getBuffer(), keyString.getSize()); KeyString::TypeBits typeBits(keyString.getTypeBits()); KeyString::TypeBits::Reader typeBitsReader(typeBits); @@ -577,7 +579,8 @@ MaterializedRow MaterializedRow::deserializeFromKeyString(const KeyString::Value &reader, &typeBitsReader, false /* inverted */, typeBits.version, &valBuilder); } while (keepReading); - MaterializedRow result{valBuilder.numValues()}; + size_t sizeOfRow = numPrefixValsToRead ? *numPrefixValsToRead : valBuilder.numValues(); + MaterializedRow result{sizeOfRow}; valBuilder.readValues(result); return result; diff --git a/src/mongo/db/exec/sbe/values/slot.h b/src/mongo/db/exec/sbe/values/slot.h index f853f816d4d..2452c8bd094 100644 --- a/src/mongo/db/exec/sbe/values/slot.h +++ b/src/mongo/db/exec/sbe/values/slot.h @@ -483,10 +483,16 @@ public: * intended for spilling key values used in the HashAgg stage. The format is not guaranteed to * be stable between versions, so it should not be used for long-term storage or communication * between instances. + * + * If 'numPrefixValsToRead' is provided, then only the given number of values from 'keyString' + * are decoded into the resulting 'MaterializedRow'. The remaining suffix values in the + * 'keyString' are ignored. */ - static MaterializedRow deserializeFromKeyString(const KeyString::Value& keyString, + static MaterializedRow deserializeFromKeyString( + const KeyString::Value& keyString, + BufBuilder* valueBufferBuilder, + boost::optional<size_t> numPrefixValsToRead = boost::none); - BufBuilder* valueBufferBuilder); void serializeIntoKeyString(KeyString::Builder& builder) const; private: @@ -577,9 +583,9 @@ private: }; struct MaterializedRowEq { - using ComparatorType = StringData::ComparatorInterface*; + using ComparatorType = StringData::ComparatorInterface; - explicit MaterializedRowEq(const ComparatorType comparator = nullptr) + explicit MaterializedRowEq(const ComparatorType* comparator = nullptr) : _comparator(comparator) {} bool operator()(const MaterializedRow& lhs, const MaterializedRow& rhs) const { @@ -597,7 +603,7 @@ struct MaterializedRowEq { } private: - const ComparatorType _comparator = nullptr; + const ComparatorType* _comparator = nullptr; }; struct MaterializedRowLess { diff --git a/src/mongo/db/exec/sbe/values/value_builder.h b/src/mongo/db/exec/sbe/values/value_builder.h index 9ad2b511242..a39432a4429 100644 --- a/src/mongo/db/exec/sbe/values/value_builder.h +++ b/src/mongo/db/exec/sbe/values/value_builder.h @@ -323,7 +323,10 @@ public: auto bufferLen = _valueBufferBuilder->len(); size_t bufIdx = 0; size_t rowIdx = 0; - while (bufIdx < _numValues) { + // The 'row' output parameter might be smaller than the number of values owned by this + // builder. Be careful to only read as many values into 'row' as this output 'row' has space + // for. + while (rowIdx < row.size()) { invariant(rowIdx < row.size()); auto [_, tagNothing, valNothing] = getValue(bufIdx++, bufferLen); tassert(6136200, "sbe tag must be 'Boolean'", tagNothing == TypeTags::Boolean); diff --git a/src/mongo/db/exec/sbe/vm/vm.cpp b/src/mongo/db/exec/sbe/vm/vm.cpp index 0b2d83f9f9f..e5114e64c54 100644 --- a/src/mongo/db/exec/sbe/vm/vm.cpp +++ b/src/mongo/db/exec/sbe/vm/vm.cpp @@ -3589,6 +3589,41 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::aggSetUnionCappedImpl( return {ownAcc, tagAcc, valAcc}; } +std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnion(ArityType arity) { + auto [ownAcc, tagAcc, valAcc] = getFromStack(0); + + if (tagAcc == value::TypeTags::Nothing) { + // Initialize the accumulator. + ownAcc = true; + std::tie(tagAcc, valAcc) = value::makeNewArraySet(); + } else { + // Take ownership of the accumulator. + topStack(false, value::TypeTags::Nothing, 0); + } + + tassert(7039552, "accumulator must be owned", ownAcc); + value::ValueGuard guardAcc{tagAcc, valAcc}; + tassert(7039553, "accumulator must be of type ArraySet", tagAcc == value::TypeTags::ArraySet); + auto acc = value::getArraySetView(valAcc); + + auto [tagNewSet, valNewSet] = moveOwnedFromStack(1); + value::ValueGuard guardNewSet{tagNewSet, valNewSet}; + if (!value::isArray(tagNewSet)) { + return {false, value::TypeTags::Nothing, 0}; + } + + auto i = value::ArrayEnumerator{tagNewSet, valNewSet}; + while (!i.atEnd()) { + auto [elTag, elVal] = i.getViewOfValue(); + auto [copyTag, copyVal] = value::copyValue(elTag, elVal); + acc->push_back(copyTag, copyVal); + i.advance(); + } + + guardAcc.reset(); + return {ownAcc, tagAcc, valAcc}; +} + std::tuple<bool, value::TypeTags, value::Value> ByteCode::builtinAggSetUnionCapped( ArityType arity) { auto [tagNewElem, valNewElem] = moveOwnedFromStack(1); @@ -4678,6 +4713,8 @@ std::tuple<bool, value::TypeTags, value::Value> ByteCode::dispatchBuiltin(Builti return builtinConcat(arity); case Builtin::aggConcatArraysCapped: return builtinAggConcatArraysCapped(arity); + case Builtin::aggSetUnion: + return builtinAggSetUnion(arity); case Builtin::aggSetUnionCapped: return builtinAggSetUnionCapped(arity); case Builtin::aggCollSetUnionCapped: diff --git a/src/mongo/db/exec/sbe/vm/vm.h b/src/mongo/db/exec/sbe/vm/vm.h index 42c1d27015b..e054f8dee73 100644 --- a/src/mongo/db/exec/sbe/vm/vm.h +++ b/src/mongo/db/exec/sbe/vm/vm.h @@ -556,6 +556,8 @@ enum class Builtin : uint8_t { // specified size. aggSetUnionCapped, aggCollSetUnionCapped, + // Agg function for a simple set union (with no size cap or collation). + aggSetUnion, acos, acosh, @@ -1186,6 +1188,7 @@ private: std::tuple<bool, value::TypeTags, value::Value> builtinRound(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinConcat(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinAggConcatArraysCapped(ArityType arity); + std::tuple<bool, value::TypeTags, value::Value> builtinAggSetUnion(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinAggSetUnionCapped(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> builtinAggCollSetUnionCapped(ArityType arity); std::tuple<bool, value::TypeTags, value::Value> aggSetUnionCappedImpl( diff --git a/src/mongo/db/query/sbe_stage_builder.cpp b/src/mongo/db/query/sbe_stage_builder.cpp index fc5ba4a08fb..420accc9e46 100644 --- a/src/mongo/db/query/sbe_stage_builder.cpp +++ b/src/mongo/db/query/sbe_stage_builder.cpp @@ -2384,7 +2384,7 @@ std::tuple<sbe::value::SlotVector, EvalStage> generateAccumulator( const PlanStageSlots& childOutputs, PlanNodeId nodeId, sbe::value::SlotIdGenerator* slotIdGenerator, - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>>& accSlotToExprMap) { + sbe::SlotExprPairVector& accSlotExprPairs) { // Input fields may need field traversal which ends up being a complex tree. auto evalStage = optimizeFieldPaths( state, accStmt.expr.argument, std::move(childEvalStage), childOutputs, nodeId); @@ -2403,12 +2403,48 @@ std::tuple<sbe::value::SlotVector, EvalStage> generateAccumulator( for (auto& accExpr : accExprs) { auto slot = slotIdGenerator->generate(); aggSlots.push_back(slot); - accSlotToExprMap.emplace(slot, std::move(accExpr)); + accSlotExprPairs.push_back({slot, std::move(accExpr)}); } return {std::move(aggSlots), std::move(accArgEvalStage)}; } +/** + * Generate a vector of (inputSlot, mergingExpression) pairs. The slot (whose id is allocated by + * this function) will be used to store spilled partial aggregate values that have been recovered + * from disk and deserialized. The merging expression is an agg function which combines these + * partial aggregates. + * + * Usually the returned vector will be of length 1, but in some cases the MQL accumulation statement + * is implemented by calculating multiple separate aggregates in the SBE plan, which are finalized + * by a subsequent project stage to produce the ultimate value. + */ +sbe::SlotExprPairVector generateMergingExpressions(StageBuilderState& state, + const AccumulationStatement& accStmt, + int numInputSlots) { + tassert(7039555, "'numInputSlots' must be positive", numInputSlots > 0); + auto slotIdGenerator = state.slotIdGenerator; + tassert(7039556, "expected non-null 'slotIdGenerator' pointer", slotIdGenerator); + auto frameIdGenerator = state.frameIdGenerator; + tassert(7039557, "expected non-null 'frameIdGenerator' pointer", frameIdGenerator); + + auto spillSlots = slotIdGenerator->generateMultiple(numInputSlots); + auto collatorSlot = state.data->env->getSlotIfExists("collator"_sd); + auto mergingExprs = + buildCombinePartialAggregates(accStmt, spillSlots, collatorSlot, *frameIdGenerator); + + // Zip the slot vector and expression vector into a vector of pairs. + tassert(7039550, + "expected same number of slots and input exprs", + spillSlots.size() == mergingExprs.size()); + sbe::SlotExprPairVector result; + result.reserve(spillSlots.size()); + for (size_t i = 0; i < spillSlots.size(); ++i) { + result.push_back({spillSlots[i], std::move(mergingExprs[i])}); + } + return result; +} + std::tuple<std::vector<std::string>, sbe::value::SlotVector, EvalStage> generateGroupFinalStage( StageBuilderState& state, EvalStage groupEvalStage, @@ -2551,17 +2587,28 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder // Translates accumulators which are executed inside the group stage and gets slots for // accumulators. stage_builder::EvalStage accProjEvalStage = std::move(groupByEvalStage); - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> accSlotToExprMap; + sbe::SlotExprPairVector accSlotExprPairs; std::vector<sbe::value::SlotVector> aggSlotsVec; + // Since partial accumulator state may be spilled to disk and then merged, we must construct not + // only the basic agg expressions for each accumulator, but also agg expressions that are used + // to combine partial aggregates that have been spilled to disk. + sbe::SlotExprPairVector mergingExprs; for (const auto& accStmt : accStmts) { - auto [aggSlots, tempEvalStage] = generateAccumulator(_state, - accStmt, - std::move(accProjEvalStage), - childOutputs, - nodeId, - &_slotIdGenerator, - accSlotToExprMap); - aggSlotsVec.emplace_back(std::move(aggSlots)); + auto [curAggSlots, tempEvalStage] = generateAccumulator(_state, + accStmt, + std::move(accProjEvalStage), + childOutputs, + nodeId, + &_slotIdGenerator, + accSlotExprPairs); + + sbe::SlotExprPairVector curMergingExprs = + generateMergingExpressions(_state, accStmt, curAggSlots.size()); + + aggSlotsVec.emplace_back(std::move(curAggSlots)); + mergingExprs.insert(mergingExprs.end(), + std::make_move_iterator(curMergingExprs.begin()), + std::make_move_iterator(curMergingExprs.end())); accProjEvalStage = std::move(tempEvalStage); } @@ -2572,9 +2619,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder // Builds a group stage with accumulator expressions and group-by slot(s). auto groupEvalStage = makeHashAgg(std::move(accProjEvalStage), dedupedGroupBySlots, - std::move(accSlotToExprMap), + std::move(accSlotExprPairs), _state.data->env->getSlotIfExists("collator"_sd), _cq.getExpCtx()->allowDiskUse, + std::move(mergingExprs), nodeId); tassert( diff --git a/src/mongo/db/query/sbe_stage_builder_expression.cpp b/src/mongo/db/query/sbe_stage_builder_expression.cpp index 820fd90576e..51e22194131 100644 --- a/src/mongo/db/query/sbe_stage_builder_expression.cpp +++ b/src/mongo/db/query/sbe_stage_builder_expression.cpp @@ -1238,9 +1238,13 @@ public: auto finalGroupStage = makeHashAgg(std::move(unwindEvalStage), sbe::makeSV(), - sbe::makeEM(finalGroupSlot, std::move(finalAddToArrayExpr)), + sbe::makeSlotExprPairVec(finalGroupSlot, std::move(finalAddToArrayExpr)), collatorSlot, - _context->state.allowDiskUse, + // Never allow this HashAgg stage to spill. + false, + // Merging exprs are not needed since this stage is prohibited from + // spilling. + {}, _context->planNodeId); // Returns true if any of our input expressions return null. diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.cpp b/src/mongo/db/query/sbe_stage_builder_helpers.cpp index c36947a23ad..f8879e7cdb6 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.cpp +++ b/src/mongo/db/query/sbe_stage_builder_helpers.cpp @@ -470,14 +470,20 @@ EvalStage makeUnion(std::vector<EvalStage> inputStages, EvalStage makeHashAgg(EvalStage stage, sbe::value::SlotVector gbs, - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs, + sbe::SlotExprPairVector aggs, boost::optional<sbe::value::SlotId> collatorSlot, bool allowDiskUse, + sbe::SlotExprPairVector mergingExprs, PlanNodeId planNodeId) { stage.outSlots = gbs; for (auto& [slot, _] : aggs) { stage.outSlots.push_back(slot); } + + // In debug builds, we artificially force frequent spilling. This makes sure that our tests + // exercise the spilling algorithm and the associated logic for merging partial aggregates which + // otherwise would require large data sizes to exercise. + const bool forceIncreasedSpilling = kDebugBuild && allowDiskUse; stage.stage = sbe::makeS<sbe::HashAggStage>(std::move(stage.stage), std::move(gbs), std::move(aggs), @@ -485,7 +491,9 @@ EvalStage makeHashAgg(EvalStage stage, true /* optimized close */, collatorSlot, allowDiskUse, - planNodeId); + std::move(mergingExprs), + planNodeId, + forceIncreasedSpilling); return stage; } diff --git a/src/mongo/db/query/sbe_stage_builder_helpers.h b/src/mongo/db/query/sbe_stage_builder_helpers.h index 05cf73896e0..3668d38208a 100644 --- a/src/mongo/db/query/sbe_stage_builder_helpers.h +++ b/src/mongo/db/query/sbe_stage_builder_helpers.h @@ -36,6 +36,7 @@ #include "mongo/db/exec/sbe/expressions/expression.h" #include "mongo/db/exec/sbe/stages/filter.h" +#include "mongo/db/exec/sbe/stages/hash_agg.h" #include "mongo/db/exec/sbe/stages/makeobj.h" #include "mongo/db/exec/sbe/stages/project.h" #include "mongo/db/pipeline/expression.h" @@ -416,9 +417,10 @@ EvalStage makeUnion(std::vector<EvalStage> inputStages, EvalStage makeHashAgg(EvalStage stage, sbe::value::SlotVector gbs, - sbe::value::SlotMap<std::unique_ptr<sbe::EExpression>> aggs, + sbe::SlotExprPairVector aggs, boost::optional<sbe::value::SlotId> collatorSlot, bool allowDiskUse, + sbe::SlotExprPairVector mergingExprs, PlanNodeId planNodeId); EvalStage makeMkBsonObj(EvalStage stage, diff --git a/src/mongo/db/query/sbe_stage_builder_lookup.cpp b/src/mongo/db/query/sbe_stage_builder_lookup.cpp index 4b62228edde..a3e5543c848 100644 --- a/src/mongo/db/query/sbe_stage_builder_lookup.cpp +++ b/src/mongo/db/query/sbe_stage_builder_lookup.cpp @@ -342,12 +342,15 @@ std::pair<SlotId /* keyValuesSetSlot */, std::unique_ptr<sbe::PlanStage>> buildK // Re-pack the individual key values into a set. We don't cap "addToSet" here because its size // is bounded by the size of the record. SlotId keyValuesSetSlot = slotIdGenerator.generate(); + SlotId spillSlot = slotIdGenerator.generate(); EvalStage packedKeyValuesStage = makeHashAgg( EvalStage{std::move(keyValuesStage), SlotVector{}}, makeSV(), /* groupBy slots - "none" means creating a single group */ - makeEM(keyValuesSetSlot, makeFunction("addToSet"_sd, makeVariable(keyValueSlot))), + makeSlotExprPairVec(keyValuesSetSlot, + makeFunction("addToSet"_sd, makeVariable(keyValueSlot))), boost::none /* we group _all_ key values into a single set, so collator is irrelevant */, allowDiskUse, + makeSlotExprPairVec(spillSlot, makeFunction("aggSetUnion"_sd, makeVariable(spillSlot))), nodeId); // The set in 'keyValuesSetSlot' might end up empty if the localField contained only missing and @@ -403,15 +406,20 @@ std::pair<SlotId /* resultSlot */, std::unique_ptr<sbe::PlanStage>> buildForeign // are no matches, return an empty array. const int sizeCap = internalLookupStageIntermediateDocumentMaxSizeBytes.load(); SlotId accumulatorSlot = slotIdGenerator.generate(); + SlotId spillSlot = slotIdGenerator.generate(); innerBranch = makeHashAgg( std::move(innerBranch), makeSV(), /* groupBy slots */ - makeEM(accumulatorSlot, - makeFunction("addToArrayCapped"_sd, - makeVariable(foreignRecordSlot), - makeConstant(TypeTags::NumberInt32, sizeCap))), + makeSlotExprPairVec(accumulatorSlot, + makeFunction("addToArrayCapped"_sd, + makeVariable(foreignRecordSlot), + makeConstant(TypeTags::NumberInt32, sizeCap))), {} /* collatorSlot, no collation here because we want to return all matches "as is" */, allowDiskUse, + makeSlotExprPairVec(spillSlot, + makeFunction("aggConcatArraysCapped", + makeVariable(spillSlot), + makeConstant(TypeTags::NumberInt32, sizeCap))), nodeId); // 'accumulatorSlot' is either Nothing or contains an array of size two, where the front element |