1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
/**
* Tests that $group stage reports memory footprint per accumulator when explain is run with
* verbosities "executionStats" and "allPlansExecution".
*/
(function() {
"use strict";
load("jstests/libs/analyze_plan.js"); // For getAggPlanStage().
load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
const conn = MongoRunner.runMongod();
const testDB = conn.getDB('test');
const coll = testDB.explain_group_stage_exec_stats;
coll.drop();
if (checkSBEEnabled(testDB, ["featureFlagSBEGroupPushdown"])) {
// When the SBE $group pushdown feature is enabled, a $group alone is pushed down and the
// memory usage tracking isn't on a per-accumulator basis so this test is exercising
// spilling behavior of the classic DocumentSourceGroup stage.
jsTest.log("Skipping test since SBE $group pushdown has different memory tracking behavior");
MongoRunner.stopMongod(conn);
return;
}
const bigStr = Array(1025).toString(); // 1KB of ','
const maxMemoryLimitForGroupStage = 1024 * 300;
const debugBuild = testDB.adminCommand('buildInfo').debug;
const nDocs = 1000;
const nGroups = 50;
const bulk = coll.initializeUnorderedBulkOp();
for (let i = 1; i <= nDocs; i++) {
bulk.insert({_id: i, a: i, b: i % nGroups, bigStr: bigStr});
}
assert.commandWorked(bulk.execute());
const pipeline = [
{$match: {a: {$gt: 0}}},
{$sort: {b: 1}},
{$group: {_id: "$b", count: {$sum: 1}, push: {$push: "$bigStr"}, set: {$addToSet: "$bigStr"}}},
];
const expectedAccumMemUsages = {
count: nGroups * 60,
push: nDocs * 1024,
set: nGroups * 1024,
};
const expectedTotalMemoryUsage =
Object.values(expectedAccumMemUsages).reduce((acc, val) => acc + val, 0);
const expectedSpillCount = Math.ceil(expectedTotalMemoryUsage / maxMemoryLimitForGroupStage);
/**
* Checks that the execution stats in the explain output for a $group stage are as expected.
* - 'stage' is an explain output of $group stage.
* - 'expectedAccumMemUsages' is used to check the memory footprint stats for each accumulator.
* - 'isExecExplain' indicates that the explain output is run with verbosity "executionStats" or
* "allPlansExecution".
* - 'expectedSpills' indicates how many times the data was spilled to disk when executing $group
* stage.
*/
function checkGroupStages(stage, expectedAccumMemUsages, isExecExplain, expectedSpills) {
// Tracks the memory usage per accumulator in total as 'stages' passed in could be the explain
// output across a cluster.
let totalAccumMemoryUsageBytes = 0;
assert(stage.hasOwnProperty("$group"), stage);
if (isExecExplain) {
assert(stage.hasOwnProperty("maxAccumulatorMemoryUsageBytes"), stage);
const maxAccmMemUsages = stage["maxAccumulatorMemoryUsageBytes"];
for (const field of Object.keys(maxAccmMemUsages)) {
totalAccumMemoryUsageBytes += maxAccmMemUsages[field];
// Ensures that the expected accumulators are all included and the corresponding
// memory usage is in a reasonable range. Note that in debug mode, data will be
// spilled to disk every time we add a new value to a pre-existing group.
if (!debugBuild && expectedAccumMemUsages.hasOwnProperty(field)) {
assert.gt(maxAccmMemUsages[field], expectedAccumMemUsages[field]);
assert.lt(maxAccmMemUsages[field], 5 * expectedAccumMemUsages[field]);
}
}
// Don't verify spill count for debug builds, since for debug builds a spill occurs on every
// duplicate id in a group.
if (!debugBuild) {
assert.eq(stage.usedDisk, expectedSpills > 0, stage);
assert.gte(stage.spills, expectedSpills, stage);
assert.lte(stage.spills, 2 * expectedSpills, stage);
}
} else {
assert(!stage.hasOwnProperty("usedDisk"), stage);
assert(!stage.hasOwnProperty("spills"), stage);
assert(!stage.hasOwnProperty("maxAccumulatorMemoryUsageBytes"), stage);
}
// Add some wiggle room to the total memory used compared to the limit parameter since the check
// for spilling to disk happens after each document is processed.
if (expectedSpills > 0)
assert.gt(maxMemoryLimitForGroupStage + 4 * 1024, totalAccumMemoryUsageBytes, stage);
}
let groupStages = getAggPlanStage(coll.explain("executionStats").aggregate(pipeline), "$group");
checkGroupStages(groupStages, expectedAccumMemUsages, true, 0);
groupStages = getAggPlanStage(coll.explain("allPlansExecution").aggregate(pipeline), "$group");
checkGroupStages(groupStages, expectedAccumMemUsages, true, 0);
groupStages = getAggPlanStage(coll.explain("queryPlanner").aggregate(pipeline), "$group");
checkGroupStages(groupStages, {}, false, 0);
// Set MaxMemory low to force spill to disk.
assert.commandWorked(testDB.adminCommand(
{setParameter: 1, ["internalDocumentSourceGroupMaxMemoryBytes"]: maxMemoryLimitForGroupStage}));
groupStages = getAggPlanStage(
coll.explain("executionStats").aggregate(pipeline, {"allowDiskUse": true}), "$group");
checkGroupStages(groupStages, {}, true, expectedSpillCount);
groupStages = getAggPlanStage(
coll.explain("allPlansExecution").aggregate(pipeline, {"allowDiskUse": true}), "$group");
checkGroupStages(groupStages, {}, true, expectedSpillCount);
groupStages = getAggPlanStage(
coll.explain("queryPlanner").aggregate(pipeline, {"allowDiskUse": true}), "$group");
checkGroupStages(groupStages, {}, false, 0);
MongoRunner.stopMongod(conn);
}());
|