summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/exec/plan_stats_visitor.h3
-rw-r--r--src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp469
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp152
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.h46
-rw-r--r--src/mongo/db/exec/sbe/stages/plan_stats.h22
-rw-r--r--src/mongo/db/query/query_knobs.idl39
6 files changed, 433 insertions, 298 deletions
diff --git a/src/mongo/db/exec/plan_stats_visitor.h b/src/mongo/db/exec/plan_stats_visitor.h
index eacdf3369d3..ade88fd2a5f 100644
--- a/src/mongo/db/exec/plan_stats_visitor.h
+++ b/src/mongo/db/exec/plan_stats_visitor.h
@@ -43,6 +43,7 @@ struct BranchStats;
struct CheckBoundsStats;
struct LoopJoinStats;
struct TraverseStats;
+struct HashAggStats;
} // namespace sbe
struct AndHashStats;
@@ -102,6 +103,7 @@ public:
virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::CheckBoundsStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::LoopJoinStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::TraverseStats> stats) = 0;
+ virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::HashAggStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, AndHashStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, AndSortedStats> stats) = 0;
@@ -154,6 +156,7 @@ struct PlanStatsVisitorBase : public PlanStatsVisitor<IsConst> {
void visit(tree_walker::MaybeConstPtr<IsConst, sbe::CheckBoundsStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, sbe::LoopJoinStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, sbe::TraverseStats> stats) override {}
+ void visit(tree_walker::MaybeConstPtr<IsConst, sbe::HashAggStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, AndHashStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, AndSortedStats> stats) override {}
diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
index 659c249b18d..20839e6624f 100644
--- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
@@ -374,21 +374,134 @@ TEST_F(HashAggStageTest, HashAggSeekKeysTest) {
stage->close();
}
+TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) {
+ // We shouldn't spill to disk if memory is plentiful (which by default it is), even if we are
+ // allowed to.
+
+ auto ctx = makeCompileCtx();
+
+ // Build a scan of the [5,6,7,5,6,7,6,7,7] input array.
+ auto [inputTag, inputVal] =
+ stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7));
+ auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
+
+ // Build a HashAggStage, group by the scanSlot and compute a simple count.
+ auto countsSlot = generateSlotId();
+ auto stage = makeS<HashAggStage>(
+ std::move(scanStage),
+ makeSV(scanSlot),
+ makeEM(countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSV(), // Seek slot
+ true,
+ boost::none,
+ true /* allowDiskUse */,
+ kEmptyPlanNodeId);
+
+ // Prepare the tree and get the 'SlotAccessor' for the output slot.
+ auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
+
+ // Read in all of the results.
+ std::set<int64_t> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resTag, resVal] = resultAccessor->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
+ ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
+ }
+
+ // Check that the results match the expected.
+ ASSERT_EQ(3, results.size());
+ ASSERT_EQ(1, results.count(2)); // 2 of "5"s
+ ASSERT_EQ(1, results.count(3)); // 3 of "6"s
+ ASSERT_EQ(1, results.count(4)); // 4 of "7"s
+
+ // Check that the spilling behavior matches the expected.
+ auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
+ ASSERT_FALSE(stats->usedDisk);
+ ASSERT_EQ(0, stats->spilledRecords);
+
+ stage->close();
+}
+
TEST_F(HashAggStageTest, HashAggBasicCountSpill) {
- // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
- // estimated size is >= 4 * 8.
+ // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to
+ // 64B so that exactly one row fits in memory.
+ const int expectedRowsToFitInMemory = 1;
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
ON_BLOCK_EXIT([&] {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+
+ auto ctx = makeCompileCtx();
+
+ // Build a scan of the [5,6,7,5,6,7,6,7,7] input array.
+ auto [inputTag, inputVal] =
+ stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7));
+ auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
+
+ // Build a HashAggStage, group by the scanSlot and compute a simple count.
+ auto countsSlot = generateSlotId();
+ auto stage = makeS<HashAggStage>(
+ std::move(scanStage),
+ makeSV(scanSlot),
+ makeEM(countsSlot,
+ stage_builder::makeFunction(
+ "sum",
+ makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
+ makeSV(), // Seek slot
+ true,
+ boost::none,
+ true /* allowDiskUse */,
+ kEmptyPlanNodeId);
+
+ // Prepare the tree and get the 'SlotAccessor' for the output slot.
+ auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
+
+ // Read in all of the results.
+ std::set<int64_t> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resTag, resVal] = resultAccessor->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
+ ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
+ }
+
+ // Check that the results match the expected.
+ ASSERT_EQ(3, results.size());
+ ASSERT_EQ(1, results.count(2)); // 2 of "5"s
+ ASSERT_EQ(1, results.count(3)); // 3 of "6"s
+ ASSERT_EQ(1, results.count(4)); // 4 of "7"s
+
+ // Check that the spilling behavior matches the expected.
+ auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
+ ASSERT_TRUE(stats->usedDisk);
+ ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
+
+ stage->close();
+}
+
+TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) {
+ // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to
+ // 64B so that exactly one row fits in memory and spill would be required. At the same time, set
+ // the memory check bounds to exceed the number of processed records so the checks are never run
+ // and the need to spill is never discovered.
+ auto defaultMemoryLimit = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
+
+ auto defaultAtMost = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load();
+ internalQuerySBEAggMemoryCheckPerAdvanceAtMost.store(100);
+
+ auto defaultAtLeast = internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load();
+ internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.store(100);
+
ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(defaultMemoryLimit);
+ internalQuerySBEAggMemoryCheckPerAdvanceAtMost.store(defaultAtMost);
+ internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.store(defaultAtLeast);
});
auto ctx = makeCompileCtx();
@@ -416,41 +529,39 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpill) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res1Tag, res1Val] = resultAccessor->getViewOfValue();
- // There are '2' occurences of '5' in the input.
- assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
+ // Read in all of the results.
+ std::set<int64_t> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resTag, resVal] = resultAccessor->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
+ ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
+ }
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res2Tag, res2Val] = resultAccessor->getViewOfValue();
- // There are '3' occurences of '6' in the input.
- assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
+ // Check that the results match the expected.
+ ASSERT_EQ(3, results.size());
+ ASSERT_EQ(1, results.count(2)); // 2 of "5"s
+ ASSERT_EQ(1, results.count(3)); // 3 of "6"s
+ ASSERT_EQ(1, results.count(4)); // 4 of "7"s
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res3Tag, res3Val] = resultAccessor->getViewOfValue();
- // There are '4' occurences of '7' in the input.
- assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
- ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+ // Check that it did not spill.
+ auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
+ ASSERT_FALSE(stats->usedDisk);
+ ASSERT_EQ(0, stats->spilledRecords);
stage->close();
}
TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
- // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
- // estimated size is >= 4 * 8.
+ // We estimate the size of result row like {double, int64} at 50B. Set the memory threshold to
+ // 64B so that exactly one row fits in memory.
+ const int expectedRowsToFitInMemory = 1;
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
ON_BLOCK_EXIT([&] {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
- });
auto ctx = makeCompileCtx();
@@ -477,42 +588,39 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res1Tag, res1Val] = resultAccessor->getViewOfValue();
- // There are '2' occurences of '5' in the input.
- assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
+ // Read in all of the results.
+ std::set<int64_t> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resTag, resVal] = resultAccessor->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
+ ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
+ }
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res2Tag, res2Val] = resultAccessor->getViewOfValue();
- // There are '3' occurences of '6' in the input.
- assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
+ // Check that the results match the expected.
+ ASSERT_EQ(3, results.size());
+ ASSERT_EQ(1, results.count(2)); // 2 of "5.0"s
+ ASSERT_EQ(1, results.count(3)); // 3 of "6.0"s
+ ASSERT_EQ(1, results.count(4)); // 4 of "7.0"s
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res3Tag, res3Val] = resultAccessor->getViewOfValue();
- // There are '4' occurences of '7' in the input.
- assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
- ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+ // Check that the spilling behavior matches the expected.
+ auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
+ ASSERT_TRUE(stats->usedDisk);
+ ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
stage->close();
}
-
TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
- // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
- // estimated size is >= 2 * 8.
+ // We estimate the size of result row like {double, int64} at 59B. Set the memory threshold to
+ // 128B so that two rows fit in memory.
+ const int expectedRowsToFitInMemory = 2;
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(2 * 8);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128);
ON_BLOCK_EXIT([&] {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
- });
auto ctx = makeCompileCtx();
@@ -542,38 +650,37 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(countsSlot, sumsSlot));
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res1Tag, res1Val] = resultAccessors[0]->getViewOfValue();
- auto [res1TagSum, res1ValSum] = resultAccessors[1]->getViewOfValue();
+ // Read in all of the results.
+ std::set<std::pair<int64_t /*count*/, int64_t /*sum*/>> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resCountTag, resCountVal] = resultAccessors[0]->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resCountTag);
- // There are '2' occurences of '5' in the input.
- assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
- assertValuesEqual(
- res1TagSum, res1ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(10));
+ auto [resSumTag, resSumVal] = resultAccessors[1]->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resSumTag);
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res2Tag, res2Val] = resultAccessors[0]->getViewOfValue();
- auto [res2TagSum, res2ValSum] = resultAccessors[1]->getViewOfValue();
- // There are '3' occurences of '6' in the input.
- assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
- assertValuesEqual(
- res2TagSum, res2ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(18));
+ ASSERT_TRUE(results
+ .insert(std::make_pair(value::bitcastFrom<int64_t>(resCountVal),
+ value::bitcastFrom<int64_t>(resSumVal)))
+ .second);
+ }
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res3Tag, res3Val] = resultAccessors[0]->getViewOfValue();
- auto [res3TagSum, res3ValSum] = resultAccessors[1]->getViewOfValue();
- // There are '4' occurences of '7' in the input.
- assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
- assertValuesEqual(
- res3TagSum, res3ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(28));
- ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+ // Check that the results match the expected.
+ ASSERT_EQ(3, results.size());
+ ASSERT_EQ(1, results.count(std::make_pair(2, 2 * 5))); // 2 of "5"s
+ ASSERT_EQ(1, results.count(std::make_pair(3, 3 * 6))); // 3 of "6"s
+ ASSERT_EQ(1, results.count(std::make_pair(4, 4 * 7))); // 4 of "7"s
+
+ // Check that the spilling behavior matches the expected.
+ auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
+ ASSERT_TRUE(stats->usedDisk);
+ ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
stage->close();
}
TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
- // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
- // estimated size is >= 0. This sill spill everything to the RecordStore.
+ // Set available memory to zero so all aggregated rows have to be spilled.
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(0);
@@ -581,12 +688,6 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
- });
auto ctx = makeCompileCtx();
@@ -616,72 +717,33 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(countsSlot, sumsSlot));
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res1Tag, res1Val] = resultAccessors[0]->getViewOfValue();
- auto [res1TagSum, res1ValSum] = resultAccessors[1]->getViewOfValue();
+ // Read in all of the results.
+ std::set<std::pair<int64_t /*count*/, int64_t /*sum*/>> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resCountTag, resCountVal] = resultAccessors[0]->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resCountTag);
- // There are '2' occurences of '5' in the input.
- assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
- assertValuesEqual(
- res1TagSum, res1ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(10));
+ auto [resSumTag, resSumVal] = resultAccessors[1]->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resSumTag);
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res2Tag, res2Val] = resultAccessors[0]->getViewOfValue();
- auto [res2TagSum, res2ValSum] = resultAccessors[1]->getViewOfValue();
- // There are '3' occurences of '6' in the input.
- assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
- assertValuesEqual(
- res2TagSum, res2ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(18));
+ ASSERT_TRUE(results
+ .insert(std::make_pair(value::bitcastFrom<int64_t>(resCountVal),
+ value::bitcastFrom<int64_t>(resSumVal)))
+ .second);
+ }
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res3Tag, res3Val] = resultAccessors[0]->getViewOfValue();
- auto [res3TagSum, res3ValSum] = resultAccessors[1]->getViewOfValue();
- // There are '4' occurences of '7' in the input.
- assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
- assertValuesEqual(
- res3TagSum, res3ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(28));
- ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+ // Check that the results match the expected.
+ ASSERT_EQ(3, results.size());
+ ASSERT_EQ(1, results.count(std::make_pair(2, 2 * 5))); // 2 of "5"s
+ ASSERT_EQ(1, results.count(std::make_pair(3, 3 * 6))); // 3 of "6"s
+ ASSERT_EQ(1, results.count(std::make_pair(4, 4 * 7))); // 4 of "7"s
- stage->close();
-}
+ // Check that the spilling behavior matches the expected.
+ auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
+ ASSERT_TRUE(stats->usedDisk);
+ ASSERT_EQ(results.size(), stats->spilledRecords);
-TEST_F(HashAggStageTest, HashAggMemUsageTest) {
- // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
- // estimated size is >= 128 * 5.
- auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128 * 5);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
- defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
- });
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
- });
-
- auto createInputArray = [](int numberOfBytesPerEntry) {
- auto arr = BSON_ARRAY(
- std::string(numberOfBytesPerEntry, 'A')
- << std::string(numberOfBytesPerEntry, 'a') << std::string(numberOfBytesPerEntry, 'b')
- << std::string(numberOfBytesPerEntry, 'c') << std::string(numberOfBytesPerEntry, 'B')
- << std::string(numberOfBytesPerEntry, 'a'));
- return arr;
- };
-
- auto nonSpillInputArr = createInputArray(64);
- auto spillInputArr = createInputArray(256);
-
- // Groups the values as: ["a", "a"], ["A"], ["B"], ["b"], ["c"].
- auto expectedOutputArr = BSON_ARRAY(2 << 1 << 1 << 1 << 1);
- // Should NOT spill to disk because internalQuerySlotBasedExecutionHashAggMemoryUsageThreshold
- // is set to 128 * 5. (64 + padding) * 5 < 128 * 5
- performHashAggWithSpillChecking(nonSpillInputArr, expectedOutputArr);
- // Should spill to disk because internalQuerySlotBasedExecutionHashAggMemoryUsageThreshold is
- // set to 128 * 5. (256 + padding) * 5 > 128 * 5
- performHashAggWithSpillChecking(spillInputArr, expectedOutputArr, true);
+ stage->close();
}
TEST_F(HashAggStageTest, HashAggSum10Groups) {
@@ -696,12 +758,6 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
- });
auto ctx = makeCompileCtx();
@@ -749,72 +805,16 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) {
}
TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) {
- // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
- // estimated size is >= 4 * 8.
- auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
- defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
- });
- auto defaultInternalQuerySBEAggMemoryUseSampleRate =
- internalQuerySBEAggMemoryUseSampleRate.load();
- internalQuerySBEAggMemoryUseSampleRate.store(1.0);
- ON_BLOCK_EXIT([&] {
- internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
- });
-
auto ctx = makeCompileCtx();
- // Build a scan of record ids [1,10,999,10,1,999,8589869056,999,10,8589869056] input array.
+ // Build a scan of a few record ids.
+ std::vector<int64_t> ids{10, 999, 10, 999, 1, 999, 8589869056, 999, 10, 8589869056};
auto [inputTag, inputVal] = sbe::value::makeNewArray();
auto testData = sbe::value::getArrayView(inputVal);
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(1);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
+ for (auto id : ids) {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(id);
testData->push_back(ridTag, ridVal);
}
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(1);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(8589869056);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10);
- testData->push_back(ridTag, ridVal);
- }
- {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(8589869056);
- testData->push_back(ridTag, ridVal);
- }
-
auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
// Build a HashAggStage, group by the scanSlot and compute a simple count.
@@ -835,42 +835,27 @@ TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(scanSlot, countsSlot));
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res1ScanTag, res1ScanVal] = resultAccessors[0]->getViewOfValue();
- auto [res1Tag, res1Val] = resultAccessors[1]->getViewOfValue();
- // There are '2' occurences of '1' in the input.
- ASSERT_TRUE(res1ScanTag == value::TypeTags::RecordId);
- ASSERT_TRUE(sbe::value::getRecordIdView(res1ScanVal)->getLong() == 1);
- assertValuesEqual(
- res1Tag, res1Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2));
+ // Read in all of the results.
+ std::map<int64_t /*id*/, int64_t /*count*/> results;
+ while (stage->getNext() == PlanState::ADVANCED) {
+ auto [resScanTag, resScanVal] = resultAccessors[0]->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::RecordId, resScanTag);
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res2ScanTag, res2ScanVal] = resultAccessors[0]->getViewOfValue();
- auto [res2Tag, res2Val] = resultAccessors[1]->getViewOfValue();
- // There are '2' occurences of '8589869056' in the input.
- ASSERT_TRUE(res2ScanTag == value::TypeTags::RecordId);
- ASSERT_TRUE(sbe::value::getRecordIdView(res2ScanVal)->getLong() == 8589869056);
- assertValuesEqual(
- res2Tag, res2Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2));
+ auto [resTag, resVal] = resultAccessors[1]->getViewOfValue();
+ ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res3ScanTag, res3ScanVal] = resultAccessors[0]->getViewOfValue();
- auto [res3Tag, res3Val] = resultAccessors[1]->getViewOfValue();
- // There are '3' occurences of '10' in the input.
- ASSERT_TRUE(res3ScanTag == value::TypeTags::RecordId);
- ASSERT_TRUE(sbe::value::getRecordIdView(res3ScanVal)->getLong() == 10);
- assertValuesEqual(
- res3Tag, res3Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(3));
+ auto inserted = results.insert(std::make_pair(
+ value::bitcastFrom<int64_t>(sbe::value::getRecordIdView(resScanVal)->getLong()),
+ value::bitcastFrom<int64_t>(resVal)));
+ ASSERT_TRUE(inserted.second);
+ }
- ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
- auto [res4ScanTag, res4ScanVal] = resultAccessors[0]->getViewOfValue();
- auto [res4Tag, res4Val] = resultAccessors[1]->getViewOfValue();
- // There are '4' occurences of '999' in the input.
- ASSERT_TRUE(res4ScanTag == value::TypeTags::RecordId);
- ASSERT_TRUE(sbe::value::getRecordIdView(res4ScanVal)->getLong() == 999);
- assertValuesEqual(
- res4Tag, res4Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(4));
- ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
+ // Assert that the results are as expected.
+ ASSERT_EQ(4, results.size());
+ ASSERT_EQ(1, results[1]);
+ ASSERT_EQ(2, results[8589869056]);
+ ASSERT_EQ(3, results[10]);
+ ASSERT_EQ(4, results[999]);
stage->close();
}
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index 663918ca888..95568e8edf4 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -242,6 +242,21 @@ void HashAggStage::makeTemporaryRecordStore() {
assertIgnorePrepareConflictsBehavior(_opCtx);
_recordStore = _opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(
_opCtx, KeyFormat::String);
+
+ _specificStats.usedDisk = true;
+}
+
+void HashAggStage::spillRowToDisk(const value::MaterializedRow& key,
+ const value::MaterializedRow& val) {
+ KeyString::Builder kb{KeyString::Version::kLatestVersion};
+ key.serializeIntoKeyString(kb);
+ auto typeBits = kb.getTypeBits();
+
+ auto rid = RecordId(kb.getBuffer(), kb.getSize());
+ auto valFromRs = getFromRecordStore(rid);
+ tassert(6031100, "Spilling a row doesn't support updating it in the store.", !valFromRs);
+
+ spillValueToDisk(rid, val, typeBits, false /*update*/);
}
void HashAggStage::spillValueToDisk(const RecordId& key,
@@ -249,7 +264,6 @@ void HashAggStage::spillValueToDisk(const RecordId& key,
const KeyString::TypeBits& typeBits,
bool update) {
BufBuilder bufValue;
-
val.serializeForSorter(bufValue);
// Append the 'typeBits' to the end of the val's buffer so the 'key' can be reconstructed when
@@ -263,21 +277,24 @@ void HashAggStage::spillValueToDisk(const RecordId& key,
// touch.
Lock::GlobalLock lk(_opCtx, MODE_IX);
WriteUnitOfWork wuow(_opCtx);
- auto result = [&]() {
- if (update) {
- auto status =
- _recordStore->rs()->updateRecord(_opCtx, key, bufValue.buf(), bufValue.len());
- return status;
- } else {
- auto status = _recordStore->rs()->insertRecord(
- _opCtx, key, bufValue.buf(), bufValue.len(), Timestamp{});
- return status.getStatus();
- }
- }();
+
+ auto result = mongo::Status::OK();
+ if (update) {
+ result = _recordStore->rs()->updateRecord(_opCtx, key, bufValue.buf(), bufValue.len());
+ } else {
+ auto status = _recordStore->rs()->insertRecord(
+ _opCtx, key, bufValue.buf(), bufValue.len(), Timestamp{});
+ result = status.getStatus();
+ }
wuow.commit();
tassert(5843600,
str::stream() << "Failed to write to disk because " << result.reason(),
result.isOK());
+
+ if (!update) {
+ _specificStats.spilledRecords++;
+ }
+ _specificStats.lastSpilledRecordSize = bufValue.len();
}
boost::optional<value::MaterializedRow> HashAggStage::getFromRecordStore(const RecordId& rid) {
@@ -291,6 +308,73 @@ boost::optional<value::MaterializedRow> HashAggStage::getFromRecordStore(const R
}
}
+// Checks memory usage. Ideally, we'd want to know the exact size of already accumulated data, but
+// we cannot, so we estimate it based on the last updated/inserted row, if we have one, or the first
+// row in the '_ht' table. If the estimated memory usage exceeds the allowed, this method initiates
+// spilling (if haven't been done yet) and evicts some records from the '_ht' table into the temp
+// store to keep the memory usage under the limit.
+void HashAggStage::checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd) {
+ // The '_ht' table might become empty in the degenerate case when all rows had to be evicted to
+ // meet the memory constraint during a previous check -- we don't need to keep checking memory
+ // usage in this case because the table will never get new rows.
+ if (_ht->empty()) {
+ return;
+ }
+
+ mcd.memoryCheckpointCounter++;
+ if (mcd.memoryCheckpointCounter >= mcd.nextMemoryCheckpoint) {
+ if (_htIt == _ht->end()) {
+ _htIt = _ht->begin();
+ }
+ const long estimatedRowSize =
+ _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter();
+ long long estimatedTotalSize = _ht->size() * estimatedRowSize;
+ const double estimatedGainPerChildAdvance =
+ (static_cast<double>(estimatedTotalSize - mcd.lastEstimatedMemoryUsage) /
+ mcd.memoryCheckpointCounter);
+
+ if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) {
+ uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
+ "Exceeded memory limit for $group, but didn't allow external spilling."
+ " Pass allowDiskUse:true to opt in.",
+ _allowDiskUse);
+ if (!_recordStore) {
+ makeTemporaryRecordStore();
+ }
+
+ // Evict enough rows into the temporary store to drop below the memory constraint.
+ const long rowsToEvictCount =
+ 1 + (estimatedTotalSize - _approxMemoryUseInBytesBeforeSpill) / estimatedRowSize;
+ for (long i = 0; !_ht->empty() && i < rowsToEvictCount; i++) {
+ spillRowToDisk(_htIt->first, _htIt->second);
+ _ht->erase(_htIt);
+ _htIt = _ht->begin();
+ }
+ estimatedTotalSize = _ht->size() * estimatedRowSize;
+ }
+
+ // Calculate the next memory checkpoint. We estimate it based on the prior growth of the
+ // '_ht' and the remaining available memory. We have to keep doing this even after starting
+ // to spill because some accumulators can grow in size inside '_ht' (with no bounds).
+ // Value of 'estimatedGainPerChildAdvance' can be negative if the previous checkpoint
+ // evicted any records. And a value close to zero indicates a stable size of '_ht' so can
+ // delay the next check progressively.
+ const long nextCheckpointCandidate = (estimatedGainPerChildAdvance > 0.1)
+ ? mcd.checkpointMargin * (_approxMemoryUseInBytesBeforeSpill - estimatedTotalSize) /
+ estimatedGainPerChildAdvance
+ : (estimatedGainPerChildAdvance < -0.1) ? mcd.atMostCheckFrequency
+ : mcd.nextMemoryCheckpoint * 2;
+ mcd.nextMemoryCheckpoint =
+ std::min<long>(mcd.memoryCheckFrequency,
+ std::max<long>(mcd.atMostCheckFrequency, nextCheckpointCandidate));
+
+ mcd.lastEstimatedMemoryUsage = estimatedTotalSize;
+ mcd.memoryCheckpointCounter = 0;
+ mcd.memoryCheckFrequency =
+ std::min<long>(mcd.memoryCheckFrequency * 2, mcd.atLeastMemoryCheckFrequency);
+ }
+}
+
void HashAggStage::open(bool reOpen) {
auto optTimer(getOptTimer(_opCtx));
@@ -313,12 +397,11 @@ void HashAggStage::open(bool reOpen) {
_seekKeys.resize(_seekKeysAccessors.size());
- // A counter to check memory usage periodically.
- auto memoryUseCheckCounter = 0;
-
// A default value for spilling a key to the record store.
value::MaterializedRow defaultVal{_outAggAccessors.size()};
bool updateAggStateHt = false;
+ MemoryCheckData memoryCheckData;
+
while (_children[0]->getNext() == PlanState::ADVANCED) {
value::MaterializedRow key{_inKeyAccessors.size()};
// Copy keys in order to do the lookup.
@@ -344,8 +427,7 @@ void HashAggStage::open(bool reOpen) {
} else {
// The memory limit has been reached, accumulate state in '_ht' only if we
// find the key in '_ht'.
- auto it = _ht->find(key);
- _htIt = it;
+ _htIt = _ht->find(key);
updateAggStateHt = _htIt != _ht->end();
}
@@ -361,9 +443,8 @@ void HashAggStage::open(bool reOpen) {
// The memory limit has been reached and the key wasn't in the '_ht' so we need
// to spill it to the '_recordStore'.
KeyString::Builder kb{KeyString::Version::kLatestVersion};
-
- // It's safe to ignore the use-after-move warning since it's logically impossible to
- // enter this block after the move occurs.
+ // 'key' is moved only when 'updateAggStateHt' ends up "true", so it's safe to
+ // ignore the warning.
key.serializeIntoKeyString(kb); // NOLINT(bugprone-use-after-move)
auto typeBits = kb.getTypeBits();
@@ -384,26 +465,8 @@ void HashAggStage::open(bool reOpen) {
spillValueToDisk(rid, _aggValueRecordStore, typeBits, valFromRs ? true : false);
}
- // Track memory usage only when we haven't started spilling to the '_recordStore'.
- if (!_recordStore) {
- auto shouldCalculateEstimatedSize =
- _pseudoRandom.nextCanonicalDouble() < _memoryUseSampleRate;
- if (shouldCalculateEstimatedSize || ++memoryUseCheckCounter % 100 == 0) {
- memoryUseCheckCounter = 0;
- long estimatedSizeForOneRow =
- _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter();
- long long estimatedTotalSize = _ht->size() * estimatedSizeForOneRow;
-
- if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) {
- uassert(
- 5843601,
- "Exceeded memory limit for $group, but didn't allow external spilling."
- " Pass allowDiskUse:true to opt in.",
- _allowDiskUse);
- makeTemporaryRecordStore();
- }
- }
- }
+ // Estimates how much memory is being used and might start spilling.
+ checkMemoryUsageAndSpillIfNecessary(memoryCheckData);
if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) {
// During trial runs, we want to limit the amount of work done by opening a blocking
@@ -509,6 +572,7 @@ PlanState HashAggStage::getNext() {
std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) const {
auto ret = std::make_unique<PlanStageStats>(_commonStats);
+ ret->specific = std::make_unique<HashAggStats>(_specificStats);
if (includeDebugInfo) {
DebugPrinter printer;
@@ -520,6 +584,12 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co
childrenBob.append(str::stream() << slot, printer.print(expr->debugPrint()));
}
}
+ // Spilling stats.
+ bob.appendBool("usedDisk", _specificStats.usedDisk);
+ bob.appendNumber("spilledRecords", _specificStats.spilledRecords);
+ bob.appendNumber("spilledBytesApprox",
+ _specificStats.lastSpilledRecordSize * _specificStats.spilledRecords);
+
ret->debugInfo = bob.obj();
}
@@ -528,7 +598,7 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co
}
const SpecificStats* HashAggStage::getSpecificStats() const {
- return nullptr;
+ return &_specificStats;
}
void HashAggStage::close() {
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h
index afb8bea06f3..a50b4d65c81 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.h
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.h
@@ -130,7 +130,39 @@ private:
const value::MaterializedRow& val,
const KeyString::TypeBits& typeBits,
bool update);
+ void spillRowToDisk(const value::MaterializedRow& key,
+ const value::MaterializedRow& defaultVal);
+ /**
+ * We check amount of used memory every T processed incoming records, where T is calculated
+ * based on the estimated used memory and its recent growth. When the memory limit is exceeded,
+ * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' and might spill some of
+ * the already accumulated data into it.
+ */
+ struct MemoryCheckData {
+ const double checkpointMargin = internalQuerySBEAggMemoryUseCheckMargin.load();
+ const long atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load();
+ const long atLeastMemoryCheckFrequency =
+ internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load();
+
+ // The check frequency upper bound, which start at 'atMost' and exponentially backs off
+ // to 'atLeast' as more data is accumulated. If 'atLeast' is less than 'atMost', the memory
+ // checks will be done every 'atLeast' incoming records.
+ long memoryCheckFrequency = 1;
+
+ // The number of incoming records to process before the next memory checkpoint.
+ long nextMemoryCheckpoint = 0;
+
+ // The counter of the incoming records between memory checkpoints.
+ long memoryCheckpointCounter = 0;
+
+ long long lastEstimatedMemoryUsage = 0;
+
+ MemoryCheckData() {
+ memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency);
+ }
+ };
+ void checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd);
const value::SlotVector _gbs;
const value::SlotMap<std::unique_ptr<EExpression>> _aggs;
@@ -140,14 +172,6 @@ private:
// When this operator does not expect to be reopened (almost always) then it can close the child
// early.
const bool _optimizedClose{true};
- // Memory tracking variables.
- const long long _approxMemoryUseInBytesBeforeSpill =
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- const int _memoryUseSampleRate = internalQuerySBEAggMemoryUseSampleRate.load();
- // Used in collaboration with memoryUseSampleRatePercentage to determine whether we should
- // re-approximate memory usage.
- PseudoRandom _pseudoRandom = PseudoRandom(Date_t::now().asInt64());
-
value::SlotAccessorMap _outAccessors;
std::vector<value::SlotAccessor*> _inKeyAccessors;
@@ -182,11 +206,15 @@ private:
bool _compiled{false};
bool _childOpened{false};
- // Used when spilling to disk.
+ // Memory tracking and spilling to disk.
+ const long long _approxMemoryUseInBytesBeforeSpill =
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
std::unique_ptr<TemporaryRecordStore> _recordStore;
bool _drainingRecordStore{false};
std::unique_ptr<SeekableRecordCursor> _rsCursor;
+ HashAggStats _specificStats;
+
// If provided, used during a trial run to accumulate certain execution stats. Once the trial
// run is complete, this pointer is reset to nullptr.
TrialRunTracker* _tracker{nullptr};
diff --git a/src/mongo/db/exec/sbe/stages/plan_stats.h b/src/mongo/db/exec/sbe/stages/plan_stats.h
index 9e77c76ad5f..46d52726b6c 100644
--- a/src/mongo/db/exec/sbe/stages/plan_stats.h
+++ b/src/mongo/db/exec/sbe/stages/plan_stats.h
@@ -260,6 +260,28 @@ struct TraverseStats : public SpecificStats {
size_t innerCloses{0};
};
+struct HashAggStats : public SpecificStats {
+ std::unique_ptr<SpecificStats> clone() const final {
+ return std::make_unique<HashAggStats>(*this);
+ }
+
+ uint64_t estimateObjectSizeInBytes() const final {
+ return sizeof(*this);
+ }
+
+ void acceptVisitor(PlanStatsConstVisitor* visitor) const final {
+ visitor->visit(this);
+ }
+
+ void acceptVisitor(PlanStatsMutableVisitor* visitor) final {
+ visitor->visit(this);
+ }
+
+ bool usedDisk{false};
+ long long spilledRecords{0};
+ long long lastSpilledRecordSize{0};
+};
+
/**
* Visitor for calculating the number of storage reads during plan execution.
*/
diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl
index 4a23ff8bd0f..528b6b0e2cf 100644
--- a/src/mongo/db/query/query_knobs.idl
+++ b/src/mongo/db/query/query_knobs.idl
@@ -511,19 +511,46 @@ server_parameters:
gt: 0
lte: { expr: BSONObjMaxInternalSize }
- internalQuerySlotBasedExecutionHashAggMemoryUseSampleRate:
- description: "The percent chance that the size of the hash table in a HashAgg stage is re-estimated
- when an entry is updated. The estimated size of the hash table is used to determine if we should
- spill to disk. [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]"
+ internalQuerySlotBasedExecutionHashAggMemoryUseCheckMargin:
+ description: "The memory check in HashAgg stage is done on every T'th processed record, where T
+ is calculated adaptively based on the estimated memory used and its recent growth. This setting
+ defines the percent of the remaining available memory to be used before the next check, given
+ the estimated growth speed per advance [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]."
set_at: [ startup, runtime ]
- cpp_varname: "internalQuerySBEAggMemoryUseSampleRate"
+ cpp_varname: "internalQuerySBEAggMemoryUseCheckMargin"
cpp_vartype: AtomicDouble
default:
- expr: 0.5
+ expr: 0.7
validator:
gt: 0.0
lte: 1.0
+ internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtMost:
+ description: "The memory check in HashAgg stage is done on every T'th processed record, where T
+ is calculated adaptively based on the estimated memory used and its recent growth. This setting
+ defines the lower bound for T [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]."
+ set_at: [ startup, runtime ]
+ cpp_varname: "internalQuerySBEAggMemoryCheckPerAdvanceAtMost"
+ cpp_vartype: AtomicWord<long long>
+ default:
+ expr: 2
+ validator:
+ gt: 0
+
+ internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtLeast:
+ description: "The memory check in HashAgg stage is done on every T'th processed record, where T
+ is calculated adaptively based on the estimated memory used and its recent growth. This setting
+ defines the upper bound for T. If this setting is less than [see internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtMost],
+ the check will be done on every internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtLeast'th
+ processed record [see internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill]."
+ set_at: [ startup, runtime ]
+ cpp_varname: "internalQuerySBEAggMemoryCheckPerAdvanceAtLeast"
+ cpp_vartype: AtomicWord<long long>
+ default:
+ expr: 1024
+ validator:
+ gt: 0
+
internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill:
description: "The max size in bytes that the hash table in a HashAgg stage can be
estimated to be before we spill to disk."