summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
authorauto-revert-processor <dev-prod-dag@mongodb.com>2022-01-20 11:53:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-20 12:24:28 +0000
commit464b6b88bd25621a4f61f3fbf4c718c44fe68866 (patch)
treea4cc3b66d02594e9dac51e57200f51df028b6f67 /src/mongo/db/exec
parent971f88711eb859c308be510ddf48d10066c3ea62 (diff)
downloadmongo-464b6b88bd25621a4f61f3fbf4c718c44fe68866.tar.gz
Revert "SERVER-60311 Make memory usage check adaptive and add exec stats for spilling"
This reverts commit f7ee45c35d778188402f9b2fe972c96706922165.
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/plan_stats_visitor.h3
-rw-r--r--src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp469
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.cpp152
-rw-r--r--src/mongo/db/exec/sbe/stages/hash_agg.h46
-rw-r--r--src/mongo/db/exec/sbe/stages/plan_stats.h22
5 files changed, 292 insertions, 400 deletions
diff --git a/src/mongo/db/exec/plan_stats_visitor.h b/src/mongo/db/exec/plan_stats_visitor.h
index ade88fd2a5f..eacdf3369d3 100644
--- a/src/mongo/db/exec/plan_stats_visitor.h
+++ b/src/mongo/db/exec/plan_stats_visitor.h
@@ -43,7 +43,6 @@ struct BranchStats;
struct CheckBoundsStats;
struct LoopJoinStats;
struct TraverseStats;
-struct HashAggStats;
} // namespace sbe
struct AndHashStats;
@@ -103,7 +102,6 @@ public:
virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::CheckBoundsStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::LoopJoinStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::TraverseStats> stats) = 0;
- virtual void visit(tree_walker::MaybeConstPtr<IsConst, sbe::HashAggStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, AndHashStats> stats) = 0;
virtual void visit(tree_walker::MaybeConstPtr<IsConst, AndSortedStats> stats) = 0;
@@ -156,7 +154,6 @@ struct PlanStatsVisitorBase : public PlanStatsVisitor<IsConst> {
void visit(tree_walker::MaybeConstPtr<IsConst, sbe::CheckBoundsStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, sbe::LoopJoinStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, sbe::TraverseStats> stats) override {}
- void visit(tree_walker::MaybeConstPtr<IsConst, sbe::HashAggStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, AndHashStats> stats) override {}
void visit(tree_walker::MaybeConstPtr<IsConst, AndSortedStats> stats) override {}
diff --git a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
index 0fc907ce245..659c249b18d 100644
--- a/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
+++ b/src/mongo/db/exec/sbe/sbe_hash_agg_test.cpp
@@ -374,134 +374,21 @@ TEST_F(HashAggStageTest, HashAggSeekKeysTest) {
stage->close();
}
-TEST_F(HashAggStageTest, HashAggBasicCountNoSpill) {
- // We shouldn't spill to disk if memory is plentiful (which by default it is), even if we are
- // allowed to.
-
- auto ctx = makeCompileCtx();
-
- // Build a scan of the [5,6,7,5,6,7,6,7,7] input array.
- auto [inputTag, inputVal] =
- stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7));
- auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
-
- // Build a HashAggStage, group by the scanSlot and compute a simple count.
- auto countsSlot = generateSlotId();
- auto stage = makeS<HashAggStage>(
- std::move(scanStage),
- makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
- makeSV(), // Seek slot
- true,
- boost::none,
- true /* allowDiskUse */,
- kEmptyPlanNodeId);
-
- // Prepare the tree and get the 'SlotAccessor' for the output slot.
- auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
-
- // Read in all of the results.
- std::set<int64_t> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resTag, resVal] = resultAccessor->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
- ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
- }
-
- // Check that the results match the expected.
- ASSERT_EQ(3, results.size());
- ASSERT_EQ(1, results.count(2)); // 2 of "5"s
- ASSERT_EQ(1, results.count(3)); // 3 of "6"s
- ASSERT_EQ(1, results.count(4)); // 4 of "7"s
-
- // Check that the spilling behavior matches the expected.
- auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
- ASSERT_FALSE(stats->usedDisk);
- ASSERT_EQ(0, stats->spilledRecords);
-
- stage->close();
-}
-
TEST_F(HashAggStageTest, HashAggBasicCountSpill) {
- // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to
- // 64B so that exactly one row fits in memory.
- const int expectedRowsToFitInMemory = 1;
+ // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
+ // estimated size is >= 4 * 8.
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8);
ON_BLOCK_EXIT([&] {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
-
- auto ctx = makeCompileCtx();
-
- // Build a scan of the [5,6,7,5,6,7,6,7,7] input array.
- auto [inputTag, inputVal] =
- stage_builder::makeValue(BSON_ARRAY(5 << 6 << 7 << 5 << 6 << 7 << 6 << 7 << 7));
- auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
-
- // Build a HashAggStage, group by the scanSlot and compute a simple count.
- auto countsSlot = generateSlotId();
- auto stage = makeS<HashAggStage>(
- std::move(scanStage),
- makeSV(scanSlot),
- makeEM(countsSlot,
- stage_builder::makeFunction(
- "sum",
- makeE<EConstant>(value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(1)))),
- makeSV(), // Seek slot
- true,
- boost::none,
- true /* allowDiskUse */,
- kEmptyPlanNodeId);
-
- // Prepare the tree and get the 'SlotAccessor' for the output slot.
- auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
-
- // Read in all of the results.
- std::set<int64_t> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resTag, resVal] = resultAccessor->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
- ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
- }
-
- // Check that the results match the expected.
- ASSERT_EQ(3, results.size());
- ASSERT_EQ(1, results.count(2)); // 2 of "5"s
- ASSERT_EQ(1, results.count(3)); // 3 of "6"s
- ASSERT_EQ(1, results.count(4)); // 4 of "7"s
-
- // Check that the spilling behavior matches the expected.
- auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
- ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
-
- stage->close();
-}
-
-TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) {
- // We estimate the size of result row like {int64, int64} at 50B. Set the memory threshold to
- // 64B so that exactly one row fits in memory and spill would be required. At the same time, set
- // the memory check bounds to exceed the number of processed records so the checks are never run
- // and the need to spill is never discovered.
- auto defaultMemoryLimit = internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
-
- auto defaultAtMost = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load();
- internalQuerySBEAggMemoryCheckPerAdvanceAtMost.store(100);
-
- auto defaultAtLeast = internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load();
- internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.store(100);
-
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
ON_BLOCK_EXIT([&] {
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(defaultMemoryLimit);
- internalQuerySBEAggMemoryCheckPerAdvanceAtMost.store(defaultAtMost);
- internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.store(defaultAtLeast);
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
});
auto ctx = makeCompileCtx();
@@ -529,39 +416,41 @@ TEST_F(HashAggStageTest, HashAggBasicCountNoSpillIfNoMemCheck) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
- // Read in all of the results.
- std::set<int64_t> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resTag, resVal] = resultAccessor->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
- ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
- }
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res1Tag, res1Val] = resultAccessor->getViewOfValue();
+ // There are '2' occurences of '5' in the input.
+ assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
- // Check that the results match the expected.
- ASSERT_EQ(3, results.size());
- ASSERT_EQ(1, results.count(2)); // 2 of "5"s
- ASSERT_EQ(1, results.count(3)); // 3 of "6"s
- ASSERT_EQ(1, results.count(4)); // 4 of "7"s
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res2Tag, res2Val] = resultAccessor->getViewOfValue();
+ // There are '3' occurences of '6' in the input.
+ assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
- // Check that it did not spill.
- auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
- ASSERT_FALSE(stats->usedDisk);
- ASSERT_EQ(0, stats->spilledRecords);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res3Tag, res3Val] = resultAccessor->getViewOfValue();
+ // There are '4' occurences of '7' in the input.
+ assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
stage->close();
}
TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
- // We estimate the size of result row like {double, int64} at 50B. Set the memory threshold to
- // 64B so that exactly one row fits in memory.
- const int expectedRowsToFitInMemory = 1;
+ // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
+ // estimated size is >= 4 * 8.
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(64);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8);
ON_BLOCK_EXIT([&] {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ });
auto ctx = makeCompileCtx();
@@ -588,39 +477,42 @@ TEST_F(HashAggStageTest, HashAggBasicCountSpillDouble) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessor = prepareTree(ctx.get(), stage.get(), countsSlot);
- // Read in all of the results.
- std::set<int64_t> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resTag, resVal] = resultAccessor->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
- ASSERT_TRUE(results.insert(value::bitcastFrom<int64_t>(resVal)).second);
- }
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res1Tag, res1Val] = resultAccessor->getViewOfValue();
+ // There are '2' occurences of '5' in the input.
+ assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
- // Check that the results match the expected.
- ASSERT_EQ(3, results.size());
- ASSERT_EQ(1, results.count(2)); // 2 of "5.0"s
- ASSERT_EQ(1, results.count(3)); // 3 of "6.0"s
- ASSERT_EQ(1, results.count(4)); // 4 of "7.0"s
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res2Tag, res2Val] = resultAccessor->getViewOfValue();
+ // There are '3' occurences of '6' in the input.
+ assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
- // Check that the spilling behavior matches the expected.
- auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
- ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res3Tag, res3Val] = resultAccessor->getViewOfValue();
+ // There are '4' occurences of '7' in the input.
+ assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
stage->close();
}
+
TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
- // We estimate the size of result row like {double, int64} at 59B. Set the memory threshold to
- // 128B so that two rows fit in memory.
- const int expectedRowsToFitInMemory = 2;
+ // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
+ // estimated size is >= 2 * 8.
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128);
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(2 * 8);
ON_BLOCK_EXIT([&] {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ });
auto ctx = makeCompileCtx();
@@ -650,37 +542,38 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpill) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(countsSlot, sumsSlot));
- // Read in all of the results.
- std::set<std::pair<int64_t /*count*/, int64_t /*sum*/>> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resCountTag, resCountVal] = resultAccessors[0]->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resCountTag);
-
- auto [resSumTag, resSumVal] = resultAccessors[1]->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resSumTag);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res1Tag, res1Val] = resultAccessors[0]->getViewOfValue();
+ auto [res1TagSum, res1ValSum] = resultAccessors[1]->getViewOfValue();
- ASSERT_TRUE(results
- .insert(std::make_pair(value::bitcastFrom<int64_t>(resCountVal),
- value::bitcastFrom<int64_t>(resSumVal)))
- .second);
- }
+ // There are '2' occurences of '5' in the input.
+ assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
+ assertValuesEqual(
+ res1TagSum, res1ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(10));
- // Check that the results match the expected.
- ASSERT_EQ(3, results.size());
- ASSERT_EQ(1, results.count(std::make_pair(2, 2 * 5))); // 2 of "5"s
- ASSERT_EQ(1, results.count(std::make_pair(3, 3 * 6))); // 3 of "6"s
- ASSERT_EQ(1, results.count(std::make_pair(4, 4 * 7))); // 4 of "7"s
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res2Tag, res2Val] = resultAccessors[0]->getViewOfValue();
+ auto [res2TagSum, res2ValSum] = resultAccessors[1]->getViewOfValue();
+ // There are '3' occurences of '6' in the input.
+ assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
+ assertValuesEqual(
+ res2TagSum, res2ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(18));
- // Check that the spilling behavior matches the expected.
- auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
- ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size() - expectedRowsToFitInMemory, stats->spilledRecords);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res3Tag, res3Val] = resultAccessors[0]->getViewOfValue();
+ auto [res3TagSum, res3ValSum] = resultAccessors[1]->getViewOfValue();
+ // There are '4' occurences of '7' in the input.
+ assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
+ assertValuesEqual(
+ res3TagSum, res3ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(28));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
stage->close();
}
TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
- // Set available memory to zero so all aggregated rows have to be spilled.
+ // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
+ // estimated size is >= 0. This sill spill everything to the RecordStore.
auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(0);
@@ -688,6 +581,12 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ });
auto ctx = makeCompileCtx();
@@ -717,35 +616,74 @@ TEST_F(HashAggStageTest, HashAggMultipleAccSpillAllToDisk) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(countsSlot, sumsSlot));
- // Read in all of the results.
- std::set<std::pair<int64_t /*count*/, int64_t /*sum*/>> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resCountTag, resCountVal] = resultAccessors[0]->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resCountTag);
-
- auto [resSumTag, resSumVal] = resultAccessors[1]->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resSumTag);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res1Tag, res1Val] = resultAccessors[0]->getViewOfValue();
+ auto [res1TagSum, res1ValSum] = resultAccessors[1]->getViewOfValue();
- ASSERT_TRUE(results
- .insert(std::make_pair(value::bitcastFrom<int64_t>(resCountVal),
- value::bitcastFrom<int64_t>(resSumVal)))
- .second);
- }
+ // There are '2' occurences of '5' in the input.
+ assertValuesEqual(res1Tag, res1Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(2));
+ assertValuesEqual(
+ res1TagSum, res1ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(10));
- // Check that the results match the expected.
- ASSERT_EQ(3, results.size());
- ASSERT_EQ(1, results.count(std::make_pair(2, 2 * 5))); // 2 of "5"s
- ASSERT_EQ(1, results.count(std::make_pair(3, 3 * 6))); // 3 of "6"s
- ASSERT_EQ(1, results.count(std::make_pair(4, 4 * 7))); // 4 of "7"s
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res2Tag, res2Val] = resultAccessors[0]->getViewOfValue();
+ auto [res2TagSum, res2ValSum] = resultAccessors[1]->getViewOfValue();
+ // There are '3' occurences of '6' in the input.
+ assertValuesEqual(res2Tag, res2Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(3));
+ assertValuesEqual(
+ res2TagSum, res2ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(18));
- // Check that the spilling behavior matches the expected.
- auto stats = static_cast<const HashAggStats*>(stage->getSpecificStats());
- ASSERT_TRUE(stats->usedDisk);
- ASSERT_EQ(results.size(), stats->spilledRecords);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res3Tag, res3Val] = resultAccessors[0]->getViewOfValue();
+ auto [res3TagSum, res3ValSum] = resultAccessors[1]->getViewOfValue();
+ // There are '4' occurences of '7' in the input.
+ assertValuesEqual(res3Tag, res3Val, value::TypeTags::NumberInt32, value::bitcastFrom<int>(4));
+ assertValuesEqual(
+ res3TagSum, res3ValSum, value::TypeTags::NumberInt32, value::bitcastFrom<int>(28));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
stage->close();
}
+TEST_F(HashAggStageTest, HashAggMemUsageTest) {
+ // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
+ // estimated size is >= 128 * 5.
+ auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(128 * 5);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
+ defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
+ });
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ });
+
+ auto createInputArray = [](int numberOfBytesPerEntry) {
+ auto arr = BSON_ARRAY(
+ std::string(numberOfBytesPerEntry, 'A')
+ << std::string(numberOfBytesPerEntry, 'a') << std::string(numberOfBytesPerEntry, 'b')
+ << std::string(numberOfBytesPerEntry, 'c') << std::string(numberOfBytesPerEntry, 'B')
+ << std::string(numberOfBytesPerEntry, 'a'));
+ return arr;
+ };
+
+ auto nonSpillInputArr = createInputArray(64);
+ auto spillInputArr = createInputArray(256);
+
+ // Groups the values as: ["a", "a"], ["A"], ["B"], ["b"], ["c"].
+ auto expectedOutputArr = BSON_ARRAY(2 << 1 << 1 << 1 << 1);
+ // Should NOT spill to disk because internalQuerySlotBasedExecutionHashAggMemoryUsageThreshold
+ // is set to 128 * 5. (64 + padding) * 5 < 128 * 5
+ performHashAggWithSpillChecking(nonSpillInputArr, expectedOutputArr);
+ // Should spill to disk because internalQuerySlotBasedExecutionHashAggMemoryUsageThreshold is
+ // set to 128 * 5. (256 + padding) * 5 > 128 * 5
+ performHashAggWithSpillChecking(spillInputArr, expectedOutputArr, true);
+}
+
TEST_F(HashAggStageTest, HashAggSum10Groups) {
// Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
// estimated size is >= 128. This should spilt the number of ints between the hash table and
@@ -758,6 +696,12 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) {
internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
});
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ });
auto ctx = makeCompileCtx();
@@ -805,16 +749,72 @@ TEST_F(HashAggStageTest, HashAggSum10Groups) {
}
TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) {
+ // Changing the query knobs to always re-estimate the hash table size in HashAgg and spill when
+ // estimated size is >= 4 * 8.
+ auto defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill =
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(4 * 8);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.store(
+ defaultInternalQuerySBEAggApproxMemoryUseInBytesBeforeSpill);
+ });
+ auto defaultInternalQuerySBEAggMemoryUseSampleRate =
+ internalQuerySBEAggMemoryUseSampleRate.load();
+ internalQuerySBEAggMemoryUseSampleRate.store(1.0);
+ ON_BLOCK_EXIT([&] {
+ internalQuerySBEAggMemoryUseSampleRate.store(defaultInternalQuerySBEAggMemoryUseSampleRate);
+ });
+
auto ctx = makeCompileCtx();
- // Build a scan of a few record ids.
- std::vector<long> ids{10, 999, 10, 999, 1, 999, 8589869056, 999, 10, 8589869056};
+ // Build a scan of record ids [1,10,999,10,1,999,8589869056,999,10,8589869056] input array.
auto [inputTag, inputVal] = sbe::value::makeNewArray();
auto testData = sbe::value::getArrayView(inputVal);
- for (auto id : ids) {
- auto [ridTag, ridVal] = sbe::value::makeNewRecordId(id);
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(1);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
testData->push_back(ridTag, ridVal);
}
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(1);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(8589869056);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(999);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(10);
+ testData->push_back(ridTag, ridVal);
+ }
+ {
+ auto [ridTag, ridVal] = sbe::value::makeNewRecordId(8589869056);
+ testData->push_back(ridTag, ridVal);
+ }
+
auto [scanSlot, scanStage] = generateVirtualScan(inputTag, inputVal);
// Build a HashAggStage, group by the scanSlot and compute a simple count.
@@ -835,27 +835,42 @@ TEST_F(HashAggStageTest, HashAggBasicCountWithRecordIds) {
// Prepare the tree and get the 'SlotAccessor' for the output slot.
auto resultAccessors = prepareTree(ctx.get(), stage.get(), makeSV(scanSlot, countsSlot));
- // Read in all of the results.
- std::map<int64_t /*id*/, int64_t /*count*/> results;
- while (stage->getNext() == PlanState::ADVANCED) {
- auto [resScanTag, resScanVal] = resultAccessors[0]->getViewOfValue();
- ASSERT_EQ(value::TypeTags::RecordId, resScanTag);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res1ScanTag, res1ScanVal] = resultAccessors[0]->getViewOfValue();
+ auto [res1Tag, res1Val] = resultAccessors[1]->getViewOfValue();
+ // There are '2' occurences of '1' in the input.
+ ASSERT_TRUE(res1ScanTag == value::TypeTags::RecordId);
+ ASSERT_TRUE(sbe::value::getRecordIdView(res1ScanVal)->getLong() == 1);
+ assertValuesEqual(
+ res1Tag, res1Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2));
- auto [resTag, resVal] = resultAccessors[1]->getViewOfValue();
- ASSERT_EQ(value::TypeTags::NumberInt64, resTag);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res2ScanTag, res2ScanVal] = resultAccessors[0]->getViewOfValue();
+ auto [res2Tag, res2Val] = resultAccessors[1]->getViewOfValue();
+ // There are '2' occurences of '8589869056' in the input.
+ ASSERT_TRUE(res2ScanTag == value::TypeTags::RecordId);
+ ASSERT_TRUE(sbe::value::getRecordIdView(res2ScanVal)->getLong() == 8589869056);
+ assertValuesEqual(
+ res2Tag, res2Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(2));
- auto inserted = results.insert(std::make_pair(
- value::bitcastFrom<int64_t>(sbe::value::getRecordIdView(resScanVal)->getLong()),
- value::bitcastFrom<int64_t>(resVal)));
- ASSERT_TRUE(inserted.second);
- }
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res3ScanTag, res3ScanVal] = resultAccessors[0]->getViewOfValue();
+ auto [res3Tag, res3Val] = resultAccessors[1]->getViewOfValue();
+ // There are '3' occurences of '10' in the input.
+ ASSERT_TRUE(res3ScanTag == value::TypeTags::RecordId);
+ ASSERT_TRUE(sbe::value::getRecordIdView(res3ScanVal)->getLong() == 10);
+ assertValuesEqual(
+ res3Tag, res3Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(3));
- // Assert that the results are as expected.
- ASSERT_EQ(4, results.size());
- ASSERT_EQ(1, results[1]);
- ASSERT_EQ(2, results[8589869056]);
- ASSERT_EQ(3, results[10]);
- ASSERT_EQ(4, results[999]);
+ ASSERT_TRUE(stage->getNext() == PlanState::ADVANCED);
+ auto [res4ScanTag, res4ScanVal] = resultAccessors[0]->getViewOfValue();
+ auto [res4Tag, res4Val] = resultAccessors[1]->getViewOfValue();
+ // There are '4' occurences of '999' in the input.
+ ASSERT_TRUE(res4ScanTag == value::TypeTags::RecordId);
+ ASSERT_TRUE(sbe::value::getRecordIdView(res4ScanVal)->getLong() == 999);
+ assertValuesEqual(
+ res4Tag, res4Val, value::TypeTags::NumberInt64, value::bitcastFrom<int64_t>(4));
+ ASSERT_TRUE(stage->getNext() == PlanState::IS_EOF);
stage->close();
}
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.cpp b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
index 95568e8edf4..663918ca888 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.cpp
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.cpp
@@ -242,21 +242,6 @@ void HashAggStage::makeTemporaryRecordStore() {
assertIgnorePrepareConflictsBehavior(_opCtx);
_recordStore = _opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(
_opCtx, KeyFormat::String);
-
- _specificStats.usedDisk = true;
-}
-
-void HashAggStage::spillRowToDisk(const value::MaterializedRow& key,
- const value::MaterializedRow& val) {
- KeyString::Builder kb{KeyString::Version::kLatestVersion};
- key.serializeIntoKeyString(kb);
- auto typeBits = kb.getTypeBits();
-
- auto rid = RecordId(kb.getBuffer(), kb.getSize());
- auto valFromRs = getFromRecordStore(rid);
- tassert(6031100, "Spilling a row doesn't support updating it in the store.", !valFromRs);
-
- spillValueToDisk(rid, val, typeBits, false /*update*/);
}
void HashAggStage::spillValueToDisk(const RecordId& key,
@@ -264,6 +249,7 @@ void HashAggStage::spillValueToDisk(const RecordId& key,
const KeyString::TypeBits& typeBits,
bool update) {
BufBuilder bufValue;
+
val.serializeForSorter(bufValue);
// Append the 'typeBits' to the end of the val's buffer so the 'key' can be reconstructed when
@@ -277,24 +263,21 @@ void HashAggStage::spillValueToDisk(const RecordId& key,
// touch.
Lock::GlobalLock lk(_opCtx, MODE_IX);
WriteUnitOfWork wuow(_opCtx);
-
- auto result = mongo::Status::OK();
- if (update) {
- result = _recordStore->rs()->updateRecord(_opCtx, key, bufValue.buf(), bufValue.len());
- } else {
- auto status = _recordStore->rs()->insertRecord(
- _opCtx, key, bufValue.buf(), bufValue.len(), Timestamp{});
- result = status.getStatus();
- }
+ auto result = [&]() {
+ if (update) {
+ auto status =
+ _recordStore->rs()->updateRecord(_opCtx, key, bufValue.buf(), bufValue.len());
+ return status;
+ } else {
+ auto status = _recordStore->rs()->insertRecord(
+ _opCtx, key, bufValue.buf(), bufValue.len(), Timestamp{});
+ return status.getStatus();
+ }
+ }();
wuow.commit();
tassert(5843600,
str::stream() << "Failed to write to disk because " << result.reason(),
result.isOK());
-
- if (!update) {
- _specificStats.spilledRecords++;
- }
- _specificStats.lastSpilledRecordSize = bufValue.len();
}
boost::optional<value::MaterializedRow> HashAggStage::getFromRecordStore(const RecordId& rid) {
@@ -308,73 +291,6 @@ boost::optional<value::MaterializedRow> HashAggStage::getFromRecordStore(const R
}
}
-// Checks memory usage. Ideally, we'd want to know the exact size of already accumulated data, but
-// we cannot, so we estimate it based on the last updated/inserted row, if we have one, or the first
-// row in the '_ht' table. If the estimated memory usage exceeds the allowed, this method initiates
-// spilling (if haven't been done yet) and evicts some records from the '_ht' table into the temp
-// store to keep the memory usage under the limit.
-void HashAggStage::checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd) {
- // The '_ht' table might become empty in the degenerate case when all rows had to be evicted to
- // meet the memory constraint during a previous check -- we don't need to keep checking memory
- // usage in this case because the table will never get new rows.
- if (_ht->empty()) {
- return;
- }
-
- mcd.memoryCheckpointCounter++;
- if (mcd.memoryCheckpointCounter >= mcd.nextMemoryCheckpoint) {
- if (_htIt == _ht->end()) {
- _htIt = _ht->begin();
- }
- const long estimatedRowSize =
- _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter();
- long long estimatedTotalSize = _ht->size() * estimatedRowSize;
- const double estimatedGainPerChildAdvance =
- (static_cast<double>(estimatedTotalSize - mcd.lastEstimatedMemoryUsage) /
- mcd.memoryCheckpointCounter);
-
- if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) {
- uassert(ErrorCodes::QueryExceededMemoryLimitNoDiskUseAllowed,
- "Exceeded memory limit for $group, but didn't allow external spilling."
- " Pass allowDiskUse:true to opt in.",
- _allowDiskUse);
- if (!_recordStore) {
- makeTemporaryRecordStore();
- }
-
- // Evict enough rows into the temporary store to drop below the memory constraint.
- const long rowsToEvictCount =
- 1 + (estimatedTotalSize - _approxMemoryUseInBytesBeforeSpill) / estimatedRowSize;
- for (long i = 0; !_ht->empty() && i < rowsToEvictCount; i++) {
- spillRowToDisk(_htIt->first, _htIt->second);
- _ht->erase(_htIt);
- _htIt = _ht->begin();
- }
- estimatedTotalSize = _ht->size() * estimatedRowSize;
- }
-
- // Calculate the next memory checkpoint. We estimate it based on the prior growth of the
- // '_ht' and the remaining available memory. We have to keep doing this even after starting
- // to spill because some accumulators can grow in size inside '_ht' (with no bounds).
- // Value of 'estimatedGainPerChildAdvance' can be negative if the previous checkpoint
- // evicted any records. And a value close to zero indicates a stable size of '_ht' so can
- // delay the next check progressively.
- const long nextCheckpointCandidate = (estimatedGainPerChildAdvance > 0.1)
- ? mcd.checkpointMargin * (_approxMemoryUseInBytesBeforeSpill - estimatedTotalSize) /
- estimatedGainPerChildAdvance
- : (estimatedGainPerChildAdvance < -0.1) ? mcd.atMostCheckFrequency
- : mcd.nextMemoryCheckpoint * 2;
- mcd.nextMemoryCheckpoint =
- std::min<long>(mcd.memoryCheckFrequency,
- std::max<long>(mcd.atMostCheckFrequency, nextCheckpointCandidate));
-
- mcd.lastEstimatedMemoryUsage = estimatedTotalSize;
- mcd.memoryCheckpointCounter = 0;
- mcd.memoryCheckFrequency =
- std::min<long>(mcd.memoryCheckFrequency * 2, mcd.atLeastMemoryCheckFrequency);
- }
-}
-
void HashAggStage::open(bool reOpen) {
auto optTimer(getOptTimer(_opCtx));
@@ -397,11 +313,12 @@ void HashAggStage::open(bool reOpen) {
_seekKeys.resize(_seekKeysAccessors.size());
+ // A counter to check memory usage periodically.
+ auto memoryUseCheckCounter = 0;
+
// A default value for spilling a key to the record store.
value::MaterializedRow defaultVal{_outAggAccessors.size()};
bool updateAggStateHt = false;
- MemoryCheckData memoryCheckData;
-
while (_children[0]->getNext() == PlanState::ADVANCED) {
value::MaterializedRow key{_inKeyAccessors.size()};
// Copy keys in order to do the lookup.
@@ -427,7 +344,8 @@ void HashAggStage::open(bool reOpen) {
} else {
// The memory limit has been reached, accumulate state in '_ht' only if we
// find the key in '_ht'.
- _htIt = _ht->find(key);
+ auto it = _ht->find(key);
+ _htIt = it;
updateAggStateHt = _htIt != _ht->end();
}
@@ -443,8 +361,9 @@ void HashAggStage::open(bool reOpen) {
// The memory limit has been reached and the key wasn't in the '_ht' so we need
// to spill it to the '_recordStore'.
KeyString::Builder kb{KeyString::Version::kLatestVersion};
- // 'key' is moved only when 'updateAggStateHt' ends up "true", so it's safe to
- // ignore the warning.
+
+ // It's safe to ignore the use-after-move warning since it's logically impossible to
+ // enter this block after the move occurs.
key.serializeIntoKeyString(kb); // NOLINT(bugprone-use-after-move)
auto typeBits = kb.getTypeBits();
@@ -465,8 +384,26 @@ void HashAggStage::open(bool reOpen) {
spillValueToDisk(rid, _aggValueRecordStore, typeBits, valFromRs ? true : false);
}
- // Estimates how much memory is being used and might start spilling.
- checkMemoryUsageAndSpillIfNecessary(memoryCheckData);
+ // Track memory usage only when we haven't started spilling to the '_recordStore'.
+ if (!_recordStore) {
+ auto shouldCalculateEstimatedSize =
+ _pseudoRandom.nextCanonicalDouble() < _memoryUseSampleRate;
+ if (shouldCalculateEstimatedSize || ++memoryUseCheckCounter % 100 == 0) {
+ memoryUseCheckCounter = 0;
+ long estimatedSizeForOneRow =
+ _htIt->first.memUsageForSorter() + _htIt->second.memUsageForSorter();
+ long long estimatedTotalSize = _ht->size() * estimatedSizeForOneRow;
+
+ if (estimatedTotalSize >= _approxMemoryUseInBytesBeforeSpill) {
+ uassert(
+ 5843601,
+ "Exceeded memory limit for $group, but didn't allow external spilling."
+ " Pass allowDiskUse:true to opt in.",
+ _allowDiskUse);
+ makeTemporaryRecordStore();
+ }
+ }
+ }
if (_tracker && _tracker->trackProgress<TrialRunTracker::kNumResults>(1)) {
// During trial runs, we want to limit the amount of work done by opening a blocking
@@ -572,7 +509,6 @@ PlanState HashAggStage::getNext() {
std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) const {
auto ret = std::make_unique<PlanStageStats>(_commonStats);
- ret->specific = std::make_unique<HashAggStats>(_specificStats);
if (includeDebugInfo) {
DebugPrinter printer;
@@ -584,12 +520,6 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co
childrenBob.append(str::stream() << slot, printer.print(expr->debugPrint()));
}
}
- // Spilling stats.
- bob.appendBool("usedDisk", _specificStats.usedDisk);
- bob.appendNumber("spilledRecords", _specificStats.spilledRecords);
- bob.appendNumber("spilledBytesApprox",
- _specificStats.lastSpilledRecordSize * _specificStats.spilledRecords);
-
ret->debugInfo = bob.obj();
}
@@ -598,7 +528,7 @@ std::unique_ptr<PlanStageStats> HashAggStage::getStats(bool includeDebugInfo) co
}
const SpecificStats* HashAggStage::getSpecificStats() const {
- return &_specificStats;
+ return nullptr;
}
void HashAggStage::close() {
diff --git a/src/mongo/db/exec/sbe/stages/hash_agg.h b/src/mongo/db/exec/sbe/stages/hash_agg.h
index a50b4d65c81..afb8bea06f3 100644
--- a/src/mongo/db/exec/sbe/stages/hash_agg.h
+++ b/src/mongo/db/exec/sbe/stages/hash_agg.h
@@ -130,39 +130,7 @@ private:
const value::MaterializedRow& val,
const KeyString::TypeBits& typeBits,
bool update);
- void spillRowToDisk(const value::MaterializedRow& key,
- const value::MaterializedRow& defaultVal);
- /**
- * We check amount of used memory every T processed incoming records, where T is calculated
- * based on the estimated used memory and its recent growth. When the memory limit is exceeded,
- * 'checkMemoryUsageAndSpillIfNecessary()' will create '_recordStore' and might spill some of
- * the already accumulated data into it.
- */
- struct MemoryCheckData {
- const double checkpointMargin = internalQuerySBEAggMemoryUseCheckMargin.load();
- const long atMostCheckFrequency = internalQuerySBEAggMemoryCheckPerAdvanceAtMost.load();
- const long atLeastMemoryCheckFrequency =
- internalQuerySBEAggMemoryCheckPerAdvanceAtLeast.load();
-
- // The check frequency upper bound, which start at 'atMost' and exponentially backs off
- // to 'atLeast' as more data is accumulated. If 'atLeast' is less than 'atMost', the memory
- // checks will be done every 'atLeast' incoming records.
- long memoryCheckFrequency = 1;
-
- // The number of incoming records to process before the next memory checkpoint.
- long nextMemoryCheckpoint = 0;
-
- // The counter of the incoming records between memory checkpoints.
- long memoryCheckpointCounter = 0;
-
- long long lastEstimatedMemoryUsage = 0;
-
- MemoryCheckData() {
- memoryCheckFrequency = std::min(atMostCheckFrequency, atLeastMemoryCheckFrequency);
- }
- };
- void checkMemoryUsageAndSpillIfNecessary(MemoryCheckData& mcd);
const value::SlotVector _gbs;
const value::SlotMap<std::unique_ptr<EExpression>> _aggs;
@@ -172,6 +140,14 @@ private:
// When this operator does not expect to be reopened (almost always) then it can close the child
// early.
const bool _optimizedClose{true};
+ // Memory tracking variables.
+ const long long _approxMemoryUseInBytesBeforeSpill =
+ internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
+ const int _memoryUseSampleRate = internalQuerySBEAggMemoryUseSampleRate.load();
+ // Used in collaboration with memoryUseSampleRatePercentage to determine whether we should
+ // re-approximate memory usage.
+ PseudoRandom _pseudoRandom = PseudoRandom(Date_t::now().asInt64());
+
value::SlotAccessorMap _outAccessors;
std::vector<value::SlotAccessor*> _inKeyAccessors;
@@ -206,15 +182,11 @@ private:
bool _compiled{false};
bool _childOpened{false};
- // Memory tracking and spilling to disk.
- const long long _approxMemoryUseInBytesBeforeSpill =
- internalQuerySBEAggApproxMemoryUseInBytesBeforeSpill.load();
+ // Used when spilling to disk.
std::unique_ptr<TemporaryRecordStore> _recordStore;
bool _drainingRecordStore{false};
std::unique_ptr<SeekableRecordCursor> _rsCursor;
- HashAggStats _specificStats;
-
// If provided, used during a trial run to accumulate certain execution stats. Once the trial
// run is complete, this pointer is reset to nullptr.
TrialRunTracker* _tracker{nullptr};
diff --git a/src/mongo/db/exec/sbe/stages/plan_stats.h b/src/mongo/db/exec/sbe/stages/plan_stats.h
index 46d52726b6c..9e77c76ad5f 100644
--- a/src/mongo/db/exec/sbe/stages/plan_stats.h
+++ b/src/mongo/db/exec/sbe/stages/plan_stats.h
@@ -260,28 +260,6 @@ struct TraverseStats : public SpecificStats {
size_t innerCloses{0};
};
-struct HashAggStats : public SpecificStats {
- std::unique_ptr<SpecificStats> clone() const final {
- return std::make_unique<HashAggStats>(*this);
- }
-
- uint64_t estimateObjectSizeInBytes() const final {
- return sizeof(*this);
- }
-
- void acceptVisitor(PlanStatsConstVisitor* visitor) const final {
- visitor->visit(this);
- }
-
- void acceptVisitor(PlanStatsMutableVisitor* visitor) final {
- visitor->visit(this);
- }
-
- bool usedDisk{false};
- long long spilledRecords{0};
- long long lastSpilledRecordSize{0};
-};
-
/**
* Visitor for calculating the number of storage reads during plan execution.
*/