diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/exec/plan_stats_visitor.h | 3 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp | 469 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.cpp | 152 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/hash_agg.h | 46 | ||||
-rw-r--r-- | src/mongo/db/exec/sbe/stages/plan_stats.h | 22 | ||||
-rw-r--r-- | src/mongo/db/query/query_knobs.idl | 39 |
6 files changed, 433 insertions, 298 deletions
diff --git a/src/mongo/db/exec/plan_stats_visitor.h b/src/mongo/db/exec/plan_stats_visitor.h index eacdf3369d3..ade88fd2a5f 100644 --- a/src/mongo/db/exec/plan_stats_visitor.h +++ b/src/mongo/db/exec/plan_stats_visitor.h @@ -43,6 +43,7 @@ struct BranchStats; struct CheckBoundsStats; struct LoopJoinStats; struct TraverseStats; +struct HashAggStats; } // namespace sbe struct AndHashStats; @@ -102,6 +103,7 @@ public: virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::CheckBoundsStats> stats) = 0; virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::LoopJoinStats> stats) = 0; virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::TraverseStats> stats) = 0; + virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::HashAggStats> stats) = 0; virtual void visit(tree_walker::MaybeConstPtr<IsConst, AndHashStats> stats) = 0; virtual void visit(tree_walker::MaybeConstPtr<IsConst, AndSortedStats> stats) = 0; @@ -154,6 +156,7 @@ struct PlanStatsVisitorBase : public PlanStatsVisitor<IsConst> { void visit(tree_walker::MaybeConstPtr<IsConst, sbe::CheckBoundsStats> stats) override {} void visit(tree_walker::MaybeConstPtr<IsConst, sbe::LoopJoinStats> stats) override {} void visit(tree_walker::MaybeConstPtr<IsConst, sbe::TraverseStats> stats) override {} + void visit(tree_walker::MaybeConstPtr<IsConst, sbe::HashAggStats> stats) override {} void visit(tree_walker::MaybeConstPtr<IsConst, AndHashStats> stats) override {} void visit(tree_walker::MaybeConstPtr<IsConst, AndSortedStats> stats) override {} diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp index 659c249b18d..20839e6624f 100644 --- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp +++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp @@ -374,21 +374,134 @@ TEST_F(HashAggStageTest, HashAggSeekKeysTest) { stage->close(); } +TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) { + // We shouldn't spill to disk if memory is plentiful (which by default it is), even if we are + // allowed to. + + auto ctx = makeCompileCtx(); + + // Build a scan of the [5,6,7,5,6,7,6,7,7] input array. + auto [inputTag, inputVal] = + stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7)); + auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal); + + // Build a HashAggStage, group by the scanSlot and compute a simple count. + auto countsSlot = generateSlotId(); + auto stage = makeS<HashAggStage>( + std::move(scanStage), + makeSV(scanSlot), + makeEM(countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), // Seek slot + true, + boost::none, + true /* allowDiskUse */, + kEmptyPlanNodeId); + + // Prepare the tree and get the 'SlotAccessor' for the output slot. + auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot); + + // Read in all of the results. + std::set<int64_t> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resTag, resVal] = resultAccessor->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resTag); + ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second); + } + + // Check that the results match the expected. + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, results.count(2)); // 2 of "5"s + ASSERT_EQ(1, results.count(3)); // 3 of "6"s + ASSERT_EQ(1, results.count(4)); // 4 of "7"s + + // Check that the spilling behavior matches the expected. + auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); + ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->spilledRecords); + + stage->close(); +} + TEST_F(HashAggStageTest, HashAggBasicCountSpill) { - // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when - // estimated size is >= 4 * 8. + // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to + // 64B so that exactly one row fits in memory. + const int expectedRowsToFitInMemory = 1; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8); + internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); ON_BLOCK_EXIT([&] { internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); + + auto ctx = makeCompileCtx(); + + // Build a scan of the [5,6,7,5,6,7,6,7,7] input array. + auto [inputTag, inputVal] = + stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7)); + auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal); + + // Build a HashAggStage, group by the scanSlot and compute a simple count. + auto countsSlot = generateSlotId(); + auto stage = makeS<HashAggStage>( + std::move(scanStage), + makeSV(scanSlot), + makeEM(countsSlot, + stage_builder::makeFunction( + "sum", + makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))), + makeSV(), // Seek slot + true, + boost::none, + true /* allowDiskUse */, + kEmptyPlanNodeId); + + // Prepare the tree and get the 'SlotAccessor' for the output slot. + auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot); + + // Read in all of the results. + std::set<int64_t> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resTag, resVal] = resultAccessor->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resTag); + ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second); + } + + // Check that the results match the expected. + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, results.count(2)); // 2 of "5"s + ASSERT_EQ(1, results.count(3)); // 3 of "6"s + ASSERT_EQ(1, results.count(4)); // 4 of "7"s + + // Check that the spilling behavior matches the expected. + auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); + ASSERT_TRUE(stats->usedDisk); + ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); + + stage->close(); +} + +TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) { + // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to + // 64B so that exactly one row fits in memory and spill would be required. At the same time, set + // the memory check bounds to exceed the number of processed records so the checks are never run + // and the need to spill is never discovered. + auto defaultMemoryLimit = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); + internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); + + auto defaultAtMost = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load(); + internalQuerySBEAggMemoryCheckPerAdvanceAtMost.store(100); + + auto defaultAtLeast = internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load(); + internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.store(100); + ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); + internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(defaultMemoryLimit); + internalQuerySBEAggMemoryCheckPerAdvanceAtMost.store(defaultAtMost); + internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.store(defaultAtLeast); }); auto ctx = makeCompileCtx(); @@ -416,41 +529,39 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) { // Prepare the tree and get the 'SlotAccessor' for the output slot. auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res1Tag, res1Val] = resultAccessor->getViewOfValue(); - // There are '2' occurences of '5' in the input. - assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2)); + // Read in all of the results. + std::set<int64_t> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resTag, resVal] = resultAccessor->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resTag); + ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second); + } - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res2Tag, res2Val] = resultAccessor->getViewOfValue(); - // There are '3' occurences of '6' in the input. - assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3)); + // Check that the results match the expected. + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, results.count(2)); // 2 of "5"s + ASSERT_EQ(1, results.count(3)); // 3 of "6"s + ASSERT_EQ(1, results.count(4)); // 4 of "7"s - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res3Tag, res3Val] = resultAccessor->getViewOfValue(); - // There are '4' occurences of '7' in the input. - assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4)); - ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + // Check that it did not spill. + auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); + ASSERT_FALSE(stats->usedDisk); + ASSERT_EQ(0, stats->spilledRecords); stage->close(); } TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { - // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when - // estimated size is >= 4 * 8. + // We estimate the size of result row like {double, int64} at 50B. Set the memory threshold to + // 64B so that exactly one row fits in memory. + const int expectedRowsToFitInMemory = 1; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8); + internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64); ON_BLOCK_EXIT([&] { internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); - }); auto ctx = makeCompileCtx(); @@ -477,42 +588,39 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) { // Prepare the tree and get the 'SlotAccessor' for the output slot. auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res1Tag, res1Val] = resultAccessor->getViewOfValue(); - // There are '2' occurences of '5' in the input. - assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2)); + // Read in all of the results. + std::set<int64_t> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resTag, resVal] = resultAccessor->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resTag); + ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second); + } - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res2Tag, res2Val] = resultAccessor->getViewOfValue(); - // There are '3' occurences of '6' in the input. - assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3)); + // Check that the results match the expected. + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, results.count(2)); // 2 of "5.0"s + ASSERT_EQ(1, results.count(3)); // 3 of "6.0"s + ASSERT_EQ(1, results.count(4)); // 4 of "7.0"s - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res3Tag, res3Val] = resultAccessor->getViewOfValue(); - // There are '4' occurences of '7' in the input. - assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4)); - ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + // Check that the spilling behavior matches the expected. + auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); + ASSERT_TRUE(stats->usedDisk); + ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); stage->close(); } - TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { - // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when - // estimated size is >= 2 * 8. + // We estimate the size of result row like {double, int64} at 59B. Set the memory threshold to + // 128B so that two rows fit in memory. + const int expectedRowsToFitInMemory = 2; auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(2 * 8); + internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128); ON_BLOCK_EXIT([&] { internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); - }); auto ctx = makeCompileCtx(); @@ -542,38 +650,37 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) { // Prepare the tree and get the 'SlotAccessor' for the output slot. auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(countsSlot, sumsSlot)); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res1Tag, res1Val] = resultAccessors[0]->getViewOfValue(); - auto [res1TagSum, res1ValSum] = resultAccessors[1]->getViewOfValue(); + // Read in all of the results. + std::set<std::pair<int64_t /*count*/, int64_t /*sum*/>> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resCountTag, resCountVal] = resultAccessors[0]->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resCountTag); - // There are '2' occurences of '5' in the input. - assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2)); - assertValuesEqual( - res1TagSum, res1ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(10)); + auto [resSumTag, resSumVal] = resultAccessors[1]->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resSumTag); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res2Tag, res2Val] = resultAccessors[0]->getViewOfValue(); - auto [res2TagSum, res2ValSum] = resultAccessors[1]->getViewOfValue(); - // There are '3' occurences of '6' in the input. - assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3)); - assertValuesEqual( - res2TagSum, res2ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(18)); + ASSERT_TRUE(results + .insert(std::make_pair(value::bitcastFrom<int64_t>(resCountVal), + value::bitcastFrom<int64_t>(resSumVal))) + .second); + } - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res3Tag, res3Val] = resultAccessors[0]->getViewOfValue(); - auto [res3TagSum, res3ValSum] = resultAccessors[1]->getViewOfValue(); - // There are '4' occurences of '7' in the input. - assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4)); - assertValuesEqual( - res3TagSum, res3ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(28)); - ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + // Check that the results match the expected. + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, results.count(std::make_pair(2, 2 * 5))); // 2 of "5"s + ASSERT_EQ(1, results.count(std::make_pair(3, 3 * 6))); // 3 of "6"s + ASSERT_EQ(1, results.count(std::make_pair(4, 4 * 7))); // 4 of "7"s + + // Check that the spilling behavior matches the expected. + auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); + ASSERT_TRUE(stats->usedDisk); + ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords); stage->close(); } TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { - // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when - // estimated size is >= 0. This sill spill everything to the RecordStore. + // Set available memory to zero so all aggregated rows have to be spilled. auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(0); @@ -581,12 +688,6 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); - }); auto ctx = makeCompileCtx(); @@ -616,72 +717,33 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) { // Prepare the tree and get the 'SlotAccessor' for the output slot. auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(countsSlot, sumsSlot)); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res1Tag, res1Val] = resultAccessors[0]->getViewOfValue(); - auto [res1TagSum, res1ValSum] = resultAccessors[1]->getViewOfValue(); + // Read in all of the results. + std::set<std::pair<int64_t /*count*/, int64_t /*sum*/>> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resCountTag, resCountVal] = resultAccessors[0]->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resCountTag); - // There are '2' occurences of '5' in the input. - assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2)); - assertValuesEqual( - res1TagSum, res1ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(10)); + auto [resSumTag, resSumVal] = resultAccessors[1]->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resSumTag); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res2Tag, res2Val] = resultAccessors[0]->getViewOfValue(); - auto [res2TagSum, res2ValSum] = resultAccessors[1]->getViewOfValue(); - // There are '3' occurences of '6' in the input. - assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3)); - assertValuesEqual( - res2TagSum, res2ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(18)); + ASSERT_TRUE(results + .insert(std::make_pair(value::bitcastFrom<int64_t>(resCountVal), + value::bitcastFrom<int64_t>(resSumVal))) + .second); + } - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res3Tag, res3Val] = resultAccessors[0]->getViewOfValue(); - auto [res3TagSum, res3ValSum] = resultAccessors[1]->getViewOfValue(); - // There are '4' occurences of '7' in the input. - assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4)); - assertValuesEqual( - res3TagSum, res3ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(28)); - ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + // Check that the results match the expected. + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, results.count(std::make_pair(2, 2 * 5))); // 2 of "5"s + ASSERT_EQ(1, results.count(std::make_pair(3, 3 * 6))); // 3 of "6"s + ASSERT_EQ(1, results.count(std::make_pair(4, 4 * 7))); // 4 of "7"s - stage->close(); -} + // Check that the spilling behavior matches the expected. + auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats()); + ASSERT_TRUE(stats->usedDisk); + ASSERT_EQ(results.size(), stats->spilledRecords); -TEST_F(HashAggStageTest, HashAggMemUsageTest) { - // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when - // estimated size is >= 128 * 5. - auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128 * 5); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( - defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); - }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); - }); - - auto createInputArray = [](int numberOfBytesPerEntry) { - auto arr = BSON_ARRAY( - std::string(numberOfBytesPerEntry, 'A') - << std::string(numberOfBytesPerEntry, 'a') << std::string(numberOfBytesPerEntry, 'b') - << std::string(numberOfBytesPerEntry, 'c') << std::string(numberOfBytesPerEntry, 'B') - << std::string(numberOfBytesPerEntry, 'a')); - return arr; - }; - - auto nonSpillInputArr = createInputArray(64); - auto spillInputArr = createInputArray(256); - - // Groups the values as: ["a", "a"], ["A"], ["B"], ["b"], ["c"]. - auto expectedOutputArr = BSON_ARRAY(2 << 1 << 1 << 1 << 1); - // Should NOT spill to disk because internalQuerySlotBasedExecutionHashAggMemoryUsageThreshold - // is set to 128 * 5. (64 + padding) * 5 < 128 * 5 - performHashAggWithSpillChecking(nonSpillInputArr, expectedOutputArr); - // Should spill to disk because internalQuerySlotBasedExecutionHashAggMemoryUsageThreshold is - // set to 128 * 5. (256 + padding) * 5 > 128 * 5 - performHashAggWithSpillChecking(spillInputArr, expectedOutputArr, true); + stage->close(); } TEST_F(HashAggStageTest, HashAggSum10Groups) { @@ -696,12 +758,6 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) { internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); - }); auto ctx = makeCompileCtx(); @@ -749,72 +805,16 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) { } TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) { - // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when - // estimated size is >= 4 * 8. - auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill = - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store( - defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill); - }); - auto defaultInternalQuerySBEAggMemoryUseSampleRate = - internalQuerySBEAggMemoryUseSampleRate.load(); - internalQuerySBEAggMemoryUseSampleRate.store(1.0); - ON_BLOCK_EXIT([&] { - internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate); - }); - auto ctx = makeCompileCtx(); - // Build a scan of record ids [1,10,999,10,1,999,8589869056,999,10,8589869056] input array. + // Build a scan of a few record ids. + std::vector<int64_t> ids{10, 999, 10, 999, 1, 999, 8589869056, 999, 10, 8589869056}; auto [inputTag, inputVal] = sbe::value::makeNewArray(); auto testData = sbe::value::getArrayView(inputVal); - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(1); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999); + for (auto id : ids) { + auto [ridTag, ridVal] = sbe::value::makeNewRecordId(id); testData->push_back(ridTag, ridVal); } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(1); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(8589869056); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10); - testData->push_back(ridTag, ridVal); - } - { - auto [ridTag, ridVal] = sbe::value::makeNewRecordId(8589869056); - testData->push_back(ridTag, ridVal); - } - auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal); // Build a HashAggStage, group by the scanSlot and compute a simple count. @@ -835,42 +835,27 @@ TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) { // Prepare the tree and get the 'SlotAccessor' for the output slot. auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(scanSlot, countsSlot)); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res1ScanTag, res1ScanVal] = resultAccessors[0]->getViewOfValue(); - auto [res1Tag, res1Val] = resultAccessors[1]->getViewOfValue(); - // There are '2' occurences of '1' in the input. - ASSERT_TRUE(res1ScanTag == value::TypeTags::RecordId); - ASSERT_TRUE(sbe::value::getRecordIdView(res1ScanVal)->getLong() == 1); - assertValuesEqual( - res1Tag, res1Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2)); + // Read in all of the results. + std::map<int64_t /*id*/, int64_t /*count*/> results; + while (stage->getNext() == PlanState::ADVANCED) { + auto [resScanTag, resScanVal] = resultAccessors[0]->getViewOfValue(); + ASSERT_EQ(value::TypeTags::RecordId, resScanTag); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res2ScanTag, res2ScanVal] = resultAccessors[0]->getViewOfValue(); - auto [res2Tag, res2Val] = resultAccessors[1]->getViewOfValue(); - // There are '2' occurences of '8589869056' in the input. - ASSERT_TRUE(res2ScanTag == value::TypeTags::RecordId); - ASSERT_TRUE(sbe::value::getRecordIdView(res2ScanVal)->getLong() == 8589869056); - assertValuesEqual( - res2Tag, res2Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2)); + auto [resTag, resVal] = resultAccessors[1]->getViewOfValue(); + ASSERT_EQ(value::TypeTags::NumberInt64, resTag); - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res3ScanTag, res3ScanVal] = resultAccessors[0]->getViewOfValue(); - auto [res3Tag, res3Val] = resultAccessors[1]->getViewOfValue(); - // There are '3' occurences of '10' in the input. - ASSERT_TRUE(res3ScanTag == value::TypeTags::RecordId); - ASSERT_TRUE(sbe::value::getRecordIdView(res3ScanVal)->getLong() == 10); - assertValuesEqual( - res3Tag, res3Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(3)); + auto inserted = results.insert(std::make_pair( + value::bitcastFrom<int64_t>(sbe::value::getRecordIdView(resScanVal)->getLong()), + value::bitcastFrom<int64_t>(resVal))); + ASSERT_TRUE(inserted.second); + } - ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED); - auto [res4ScanTag, res4ScanVal] = resultAccessors[0]->getViewOfValue(); - auto [res4Tag, res4Val] = resultAccessors[1]->getViewOfValue(); - // There are '4' occurences of '999' in the input. - ASSERT_TRUE(res4ScanTag == value::TypeTags::RecordId); - ASSERT_TRUE(sbe::value::getRecordIdView(res4ScanVal)->getLong() == 999); - assertValuesEqual( - res4Tag, res4Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(4)); - ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF); + // Assert that the results are as expected. + ASSERT_EQ(4, results.size()); + ASSERT_EQ(1, results[1]); + ASSERT_EQ(2, results[8589869056]); + ASSERT_EQ(3, results[10]); + ASSERT_EQ(4, results[999]); stage->close(); } diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp index 663918ca888..95568e8edf4 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp +++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp @@ -242,6 +242,21 @@ void HashAggStage::makeTemporaryRecordStore() { assertIgnorePrepareConflictsBehavior(_opCtx); _recordStore = _opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore( _opCtx, KeyFormat::String); + + _specificStats.usedDisk = true; +} + +void HashAggStage::spillRowToDisk(const value::MaterializedRow& key, + const value::MaterializedRow& val) { + KeyString::Builder kb{KeyString::Version::kLatestVersion}; + key.serializeIntoKeyString(kb); + auto typeBits = kb.getTypeBits(); + + auto rid = RecordId(kb.getBuffer(), kb.getSize()); + auto valFromRs = getFromRecordStore(rid); + tassert(6031100, "Spilling a row doesn't support updating it in the store.", !valFromRs); + + spillValueToDisk(rid, val, typeBits, false /*update*/); } void HashAggStage::spillValueToDisk(const RecordId& key, @@ -249,7 +264,6 @@ void HashAggStage::spillValueToDisk(const RecordId& key, const KeyString::TypeBits& typeBits, bool update) { BufBuilder bufValue; - val.serializeForSorter(bufValue); // Append the 'typeBits' to the end of the val's buffer so the 'key' can be reconstructed when @@ -263,21 +277,24 @@ void HashAggStage::spillValueToDisk(const RecordId& key, // touch. Lock::GlobalLock lk(_opCtx, MODE_IX); WriteUnitOfWork wuow(_opCtx); - auto result = [&]() { - if (update) { - auto status = - _recordStore->rs()->updateRecord(_opCtx, key, bufValue.buf(), bufValue.len()); - return status; - } else { - auto status = _recordStore->rs()->insertRecord( - _opCtx, key, bufValue.buf(), bufValue.len(), Timestamp{}); - return status.getStatus(); - } - }(); + + auto result = mongo::Status::OK(); + if (update) { + result = _recordStore->rs()->updateRecord(_opCtx, key, bufValue.buf(), bufValue.len()); + } else { + auto status = _recordStore->rs()->insertRecord( + _opCtx, key, bufValue.buf(), bufValue.len(), Timestamp{}); + result = status.getStatus(); + } wuow.commit(); tassert(5843600, str::stream() << "Failed to write to disk because " << result.reason(), result.isOK()); + + if (!update) { + _specificStats.spilledRecords++; + } + _specificStats.lastSpilledRecordSize = bufValue.len(); } boost::optional<value::MaterializedRow> HashAggStage::getFromRecordStore(const RecordId& rid) { @@ -291,6 +308,73 @@ boost::optional<value::MaterializedRow> HashAggStage::getFromRecordStore(const R } } +// Checks memory usage. Ideally, we'd want to know the exact size of already accumulated data, but +// we cannot, so we estimate it based on the last updated/inserted row, if we have one, or the first +// row in the '_ht' table. If the estimated memory usage exceeds the allowed, this method initiates +// spilling (if haven't been done yet) and evicts some records from the '_ht' table into the temp +// store to keep the memory usage under the limit. +void HashAggStage::checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd) { + // The '_ht' table might become empty in the degenerate case when all rows had to be evicted to + // meet the memory constraint during a previous check -- we don't need to keep checking memory + // usage in this case because the table will never get new rows. + if (_ht->empty()) { + return; + } + + mcd.memoryCheckpointCounter++; + if (mcd.memoryCheckpointCounter >= mcd.nextMemoryCheckpoint) { + if (_htIt == _ht->end()) { + _htIt = _ht->begin(); + } + const long estimatedRowSize = + _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter(); + long long estimatedTotalSize = _ht->size() * estimatedRowSize; + const double estimatedGainPerChildAdvance = + (static_cast<double>(estimatedTotalSize - mcd.lastEstimatedMemoryUsage) / + mcd.memoryCheckpointCounter); + + if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) { + uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed, + "Exceeded memory limit for $group, but didn't allow external spilling." + " Pass allowDiskUse:true to opt in.", + _allowDiskUse); + if (!_recordStore) { + makeTemporaryRecordStore(); + } + + // Evict enough rows into the temporary store to drop below the memory constraint. + const long rowsToEvictCount = + 1 + (estimatedTotalSize - _approxMemoryUseInBytesBeforeSpill) / estimatedRowSize; + for (long i = 0; !_ht->empty() && i < rowsToEvictCount; i++) { + spillRowToDisk(_htIt->first, _htIt->second); + _ht->erase(_htIt); + _htIt = _ht->begin(); + } + estimatedTotalSize = _ht->size() * estimatedRowSize; + } + + // Calculate the next memory checkpoint. We estimate it based on the prior growth of the + // '_ht' and the remaining available memory. We have to keep doing this even after starting + // to spill because some accumulators can grow in size inside '_ht' (with no bounds). + // Value of 'estimatedGainPerChildAdvance' can be negative if the previous checkpoint + // evicted any records. And a value close to zero indicates a stable size of '_ht' so can + // delay the next check progressively. + const long nextCheckpointCandidate = (estimatedGainPerChildAdvance > 0.1) + ? mcd.checkpointMargin * (_approxMemoryUseInBytesBeforeSpill - estimatedTotalSize) / + estimatedGainPerChildAdvance + : (estimatedGainPerChildAdvance < -0.1) ? mcd.atMostCheckFrequency + : mcd.nextMemoryCheckpoint * 2; + mcd.nextMemoryCheckpoint = + std::min<long>(mcd.memoryCheckFrequency, + std::max<long>(mcd.atMostCheckFrequency, nextCheckpointCandidate)); + + mcd.lastEstimatedMemoryUsage = estimatedTotalSize; + mcd.memoryCheckpointCounter = 0; + mcd.memoryCheckFrequency = + std::min<long>(mcd.memoryCheckFrequency * 2, mcd.atLeastMemoryCheckFrequency); + } +} + void HashAggStage::open(bool reOpen) { auto optTimer(getOptTimer(_opCtx)); @@ -313,12 +397,11 @@ void HashAggStage::open(bool reOpen) { _seekKeys.resize(_seekKeysAccessors.size()); - // A counter to check memory usage periodically. - auto memoryUseCheckCounter = 0; - // A default value for spilling a key to the record store. value::MaterializedRow defaultVal{_outAggAccessors.size()}; bool updateAggStateHt = false; + MemoryCheckData memoryCheckData; + while (_children[0]->getNext() == PlanState::ADVANCED) { value::MaterializedRow key{_inKeyAccessors.size()}; // Copy keys in order to do the lookup. @@ -344,8 +427,7 @@ void HashAggStage::open(bool reOpen) { } else { // The memory limit has been reached, accumulate state in '_ht' only if we // find the key in '_ht'. - auto it = _ht->find(key); - _htIt = it; + _htIt = _ht->find(key); updateAggStateHt = _htIt != _ht->end(); } @@ -361,9 +443,8 @@ void HashAggStage::open(bool reOpen) { // The memory limit has been reached and the key wasn't in the '_ht' so we need // to spill it to the '_recordStore'. KeyString::Builder kb{KeyString::Version::kLatestVersion}; - - // It's safe to ignore the use-after-move warning since it's logically impossible to - // enter this block after the move occurs. + // 'key' is moved only when 'updateAggStateHt' ends up "true", so it's safe to + // ignore the warning. key.serializeIntoKeyString(kb); // NOLINT(bugprone-use-after-move) auto typeBits = kb.getTypeBits(); @@ -384,26 +465,8 @@ void HashAggStage::open(bool reOpen) { spillValueToDisk(rid, _aggValueRecordStore, typeBits, valFromRs ? true : false); } - // Track memory usage only when we haven't started spilling to the '_recordStore'. - if (!_recordStore) { - auto shouldCalculateEstimatedSize = - _pseudoRandom.nextCanonicalDouble() < _memoryUseSampleRate; - if (shouldCalculateEstimatedSize || ++memoryUseCheckCounter % 100 == 0) { - memoryUseCheckCounter = 0; - long estimatedSizeForOneRow = - _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter(); - long long estimatedTotalSize = _ht->size() * estimatedSizeForOneRow; - - if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) { - uassert( - 5843601, - "Exceeded memory limit for $group, but didn't allow external spilling." - " Pass allowDiskUse:true to opt in.", - _allowDiskUse); - makeTemporaryRecordStore(); - } - } - } + // Estimates how much memory is being used and might start spilling. + checkMemoryUsageAndSpillIfNecessary(memoryCheckData); if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) { // During trial runs, we want to limit the amount of work done by opening a blocking @@ -509,6 +572,7 @@ PlanState HashAggStage::getNext() { std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) const { auto ret = std::make_unique<PlanStageStats>(_commonStats); + ret->specific = std::make_unique<HashAggStats>(_specificStats); if (includeDebugInfo) { DebugPrinter printer; @@ -520,6 +584,12 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co childrenBob.append(str::stream() << slot, printer.print(expr->debugPrint())); } } + // Spilling stats. + bob.appendBool("usedDisk", _specificStats.usedDisk); + bob.appendNumber("spilledRecords", _specificStats.spilledRecords); + bob.appendNumber("spilledBytesApprox", + _specificStats.lastSpilledRecordSize * _specificStats.spilledRecords); + ret->debugInfo = bob.obj(); } @@ -528,7 +598,7 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co } const SpecificStats* HashAggStage::getSpecificStats() const { - return nullptr; + return &_specificStats; } void HashAggStage::close() { diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h index afb8bea06f3..a50b4d65c81 100644 --- a/src/mongo/db/exec/sbe/stages/hash_agg.h +++ b/src/mongo/db/exec/sbe/stages/hash_agg.h @@ -130,7 +130,39 @@ private: const value::MaterializedRow& val, const KeyString::TypeBits& typeBits, bool update); + void spillRowToDisk(const value::MaterializedRow& key, + const value::MaterializedRow& defaultVal); + /** + * We check amount of used memory every T processed incoming records, where T is calculated + * based on the estimated used memory and its recent growth. When the memory limit is exceeded, + * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' and might spill some of + * the already accumulated data into it. + */ + struct MemoryCheckData { + const double checkpointMargin = internalQuerySBEAggMemoryUseCheckMargin.load(); + const long atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load(); + const long atLeastMemoryCheckFrequency = + internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load(); + + // The check frequency upper bound, which start at 'atMost' and exponentially backs off + // to 'atLeast' as more data is accumulated. If 'atLeast' is less than 'atMost', the memory + // checks will be done every 'atLeast' incoming records. + long memoryCheckFrequency = 1; + + // The number of incoming records to process before the next memory checkpoint. + long nextMemoryCheckpoint = 0; + + // The counter of the incoming records between memory checkpoints. + long memoryCheckpointCounter = 0; + + long long lastEstimatedMemoryUsage = 0; + + MemoryCheckData() { + memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency); + } + }; + void checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd); const value::SlotVector _gbs; const value::SlotMap<std::unique_ptr<EExpression>> _aggs; @@ -140,14 +172,6 @@ private: // When this operator does not expect to be reopened (almost always) then it can close the child // early. const bool _optimizedClose{true}; - // Memory tracking variables. - const long long _approxMemoryUseInBytesBeforeSpill = - internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); - const int _memoryUseSampleRate = internalQuerySBEAggMemoryUseSampleRate.load(); - // Used in collaboration with memoryUseSampleRatePercentage to determine whether we should - // re-approximate memory usage. - PseudoRandom _pseudoRandom = PseudoRandom(Date_t::now().asInt64()); - value::SlotAccessorMap _outAccessors; std::vector<value::SlotAccessor*> _inKeyAccessors; @@ -182,11 +206,15 @@ private: bool _compiled{false}; bool _childOpened{false}; - // Used when spilling to disk. + // Memory tracking and spilling to disk. + const long long _approxMemoryUseInBytesBeforeSpill = + internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load(); std::unique_ptr<TemporaryRecordStore> _recordStore; bool _drainingRecordStore{false}; std::unique_ptr<SeekableRecordCursor> _rsCursor; + HashAggStats _specificStats; + // If provided, used during a trial run to accumulate certain execution stats. Once the trial // run is complete, this pointer is reset to nullptr. TrialRunTracker* _tracker{nullptr}; diff --git a/src/mongo/db/exec/sbe/stages/plan_stats.h b/src/mongo/db/exec/sbe/stages/plan_stats.h index 9e77c76ad5f..46d52726b6c 100644 --- a/src/mongo/db/exec/sbe/stages/plan_stats.h +++ b/src/mongo/db/exec/sbe/stages/plan_stats.h @@ -260,6 +260,28 @@ struct TraverseStats : public SpecificStats { size_t innerCloses{0}; }; +struct HashAggStats : public SpecificStats { + std::unique_ptr<SpecificStats> clone() const final { + return std::make_unique<HashAggStats>(*this); + } + + uint64_t estimateObjectSizeInBytes() const final { + return sizeof(*this); + } + + void acceptVisitor(PlanStatsConstVisitor* visitor) const final { + visitor->visit(this); + } + + void acceptVisitor(PlanStatsMutableVisitor* visitor) final { + visitor->visit(this); + } + + bool usedDisk{false}; + long long spilledRecords{0}; + long long lastSpilledRecordSize{0}; +}; + /** * Visitor for calculating the number of storage reads during plan execution. */ diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl index 4a23ff8bd0f..528b6b0e2cf 100644 --- a/src/mongo/db/query/query_knobs.idl +++ b/src/mongo/db/query/query_knobs.idl @@ -511,19 +511,46 @@ server_parameters: gt: 0 lte: { expr: BSONObjMaxInternalSize } - internalQuerySlotBasedExecutionHashAggMemoryUseSampleRate: - description: "The percent chance that the size of the hash table in a HashAgg stage is re-estimated - when an entry is updated. The estimated size of the hash table is used to determine if we should - spill to disk. [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]" + internalQuerySlotBasedExecutionHashAggMemoryUseCheckMargin: + description: "The memory check in HashAgg stage is done on every T'th processed record, where T + is calculated adaptively based on the estimated memory used and its recent growth. This setting + defines the percent of the remaining available memory to be used before the next check, given + the estimated growth speed per advance [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]." set_at: [ startup, runtime ] - cpp_varname: "internalQuerySBEAggMemoryUseSampleRate" + cpp_varname: "internalQuerySBEAggMemoryUseCheckMargin" cpp_vartype: AtomicDouble default: - expr: 0.5 + expr: 0.7 validator: gt: 0.0 lte: 1.0 + internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtMost: + description: "The memory check in HashAgg stage is done on every T'th processed record, where T + is calculated adaptively based on the estimated memory used and its recent growth. This setting + defines the lower bound for T [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]." + set_at: [ startup, runtime ] + cpp_varname: "internalQuerySBEAggMemoryCheckPerAdvanceAtMost" + cpp_vartype: AtomicWord<long long> + default: + expr: 2 + validator: + gt: 0 + + internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtLeast: + description: "The memory check in HashAgg stage is done on every T'th processed record, where T + is calculated adaptively based on the estimated memory used and its recent growth. This setting + defines the upper bound for T. If this setting is less than [see internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtMost], + the check will be done on every internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtLeast'th + processed record [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]." + set_at: [ startup, runtime ] + cpp_varname: "internalQuerySBEAggMemoryCheckPerAdvanceAtLeast" + cpp_vartype: AtomicWord<long long> + default: + expr: 1024 + validator: + gt: 0 + internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: description: "The max size in bytes that the hash table in a HashAgg stage can be estimated to be before we spill to disk." |