summaryrefslogtreecommitdiff
path: root/jstests/aggregation
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2023-01-30 22:54:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-31 01:27:54 +0000
commit355aadd8ff9d5599da2983990f87c7ec300a972d (patch)
treeb75d91aea6c48c0e47edcff1c3a9ededde8c3354 /jstests/aggregation
parent40c93f028e36f78c06756f4bfd358d240bdd9b34 (diff)
downloadmongo-355aadd8ff9d5599da2983990f87c7ec300a972d.tar.gz
SERVER-70395 Change spilling for SBE HashAggStage to use a more efficient algorithm
The new algorithm spills the entire hash table to a RecordStore whenever the memory budget is exceeded. Once all the input is consumed, it switches to a streaming approach, merging the partial aggregates recovered from disk.
Diffstat (limited to 'jstests/aggregation')
-rw-r--r--jstests/aggregation/spill_to_disk.js223
1 files changed, 204 insertions, 19 deletions
diff --git a/jstests/aggregation/spill_to_disk.js b/jstests/aggregation/spill_to_disk.js
index 290df334e60..568d973e988 100644
--- a/jstests/aggregation/spill_to_disk.js
+++ b/jstests/aggregation/spill_to_disk.js
@@ -24,11 +24,37 @@ load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
const coll = db.spill_to_disk;
coll.drop();
+// Sets the set parameter named 'paramName' to the given 'memoryLimit' on each primary node in the
+// cluster, and returns the old value.
+function setMemoryParamHelper(paramName, memoryLimit) {
+ const commandResArr = FixtureHelpers.runCommandOnEachPrimary({
+ db: db.getSiblingDB("admin"),
+ cmdObj: {
+ setParameter: 1,
+ [paramName]: memoryLimit,
+ }
+ });
+ assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed");
+ const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was;
+ return oldMemoryLimit;
+}
+
+// Verifies that the given 'groupStats' (an extract from SBE "executionStats" explain output) show
+// evidence of spilling to disk.
+function assertSpillingOccurredInSbeExplain(groupStats) {
+ assert(groupStats);
+ assert(groupStats.hasOwnProperty("usedDisk"), groupStats);
+ assert(groupStats.usedDisk, groupStats);
+ assert.gt(groupStats.numSpills, 0, groupStats);
+ assert.gt(groupStats.spilledRecords, 0, groupStats);
+ assert.gt(groupStats.spilledDataStorageSize, 0, groupStats);
+}
+
const sharded = FixtureHelpers.isSharded(coll);
const memoryLimitMB = sharded ? 200 : 100;
-const isSBELookupEnabled = checkSBEEnabled(db);
+const isSbeEnabled = checkSBEEnabled(db);
const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ','
for (let i = 0; i < memoryLimitMB + 1; i++)
@@ -51,7 +77,7 @@ function test({pipeline, expectedCodes, canSpillToDisk}) {
assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(),
coll.count()); // all tests output one doc per input doc
- if (isSBELookupEnabled) {
+ if (isSbeEnabled) {
const explain = db.runCommand({
explain:
{aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true}
@@ -60,12 +86,7 @@ function test({pipeline, expectedCodes, canSpillToDisk}) {
if (hashAggGroups.length > 0) {
assert.eq(hashAggGroups.length, 1, explain);
const hashAggGroup = hashAggGroups[0];
- assert(hashAggGroup, explain);
- assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup);
- assert(hashAggGroup.usedDisk, hashAggGroup);
- assert.gt(hashAggGroup.spilledRecords, 0, hashAggGroup);
- assert.gt(hashAggGroup.spilledBytesApprox, 0, hashAggGroup);
- assert.gt(hashAggGroup.spilledRecordEstimatedStorageSize, 0, hashAggGroup);
+ assertSpillingOccurredInSbeExplain(hashAggGroup);
}
}
} else {
@@ -211,6 +232,178 @@ for (const op of ['$firstN', '$lastN', '$minN', '$maxN', '$topN', '$bottomN']) {
// don't leave large collection laying around
assert(coll.drop());
+// Test spilling to disk for various accumulators in a $group stage . The data has 5 groups of 10
+// documents each. We configure a low memory limit for SBE's hash aggregation stage in order to
+// encourage spilling.
+const numGroups = 5;
+const docsPerGroup = 10;
+let counter = 0;
+for (let i = 0; i < numGroups; ++i) {
+ for (let j = 0; j < docsPerGroup; ++j) {
+ const doc = {
+ _id: counter++,
+ a: i,
+ b: 100 * i + j,
+ c: 100 * i + j % 5,
+ obj: {a: i, b: j},
+ random: Math.random()
+ };
+ assert.commandWorked(coll.insert(doc));
+ }
+}
+
+function setHashGroupMemoryParameters(memoryLimit) {
+ return setMemoryParamHelper(
+ "internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill", memoryLimit);
+}
+
+// Runs a group query containing the given 'accumulator' after sorting the data by the given
+// 'sortInputBy' field. Then verifies that the query results are equal to 'expectedOutput'. If SBE
+// is enabled, also runs explain and checks that the execution stats show that spilling occurred.
+function testAccumulator({accumulator, sortInputBy, expectedOutput, ignoreArrayOrder = false}) {
+ const pipeline =
+ [{$sort: {[sortInputBy]: 1}}, {$group: {_id: "$a", acc: accumulator}}, {$sort: {_id: 1}}];
+ const results = coll.aggregate(pipeline).toArray();
+
+ if (ignoreArrayOrder) {
+ assert(arrayEq(results, expectedOutput));
+ } else {
+ assert.eq(results, expectedOutput);
+ }
+
+ if (isSbeEnabled) {
+ const explain = coll.explain("executionStats").aggregate(pipeline);
+ const groupStages = getSbePlanStages(explain, "group");
+ assert.eq(groupStages.length, 1, groupStages);
+ assertSpillingOccurredInSbeExplain(groupStages[0]);
+ }
+}
+
+function testSpillingForVariousAccumulators() {
+ testAccumulator({
+ accumulator: {$first: "$b"},
+ sortInputBy: "_id",
+ expectedOutput: [
+ {_id: 0, acc: 0},
+ {_id: 1, acc: 100},
+ {_id: 2, acc: 200},
+ {_id: 3, acc: 300},
+ {_id: 4, acc: 400}
+ ]
+
+ });
+
+ testAccumulator({
+ accumulator: {$last: "$b"},
+ sortInputBy: "_id",
+ expectedOutput: [
+ {_id: 0, acc: 9},
+ {_id: 1, acc: 109},
+ {_id: 2, acc: 209},
+ {_id: 3, acc: 309},
+ {_id: 4, acc: 409}
+ ]
+ });
+
+ testAccumulator({
+ accumulator: {$min: "$b"},
+ sortInputBy: "random",
+ expectedOutput: [
+ {_id: 0, acc: 0},
+ {_id: 1, acc: 100},
+ {_id: 2, acc: 200},
+ {_id: 3, acc: 300},
+ {_id: 4, acc: 400}
+ ]
+ });
+
+ testAccumulator({
+ accumulator: {$max: "$b"},
+ sortInputBy: "random",
+ expectedOutput: [
+ {_id: 0, acc: 9},
+ {_id: 1, acc: 109},
+ {_id: 2, acc: 209},
+ {_id: 3, acc: 309},
+ {_id: 4, acc: 409}
+ ]
+ });
+
+ testAccumulator({
+ accumulator: {$sum: "$b"},
+ sortInputBy: "random",
+ expectedOutput: [
+ {_id: 0, acc: 45},
+ {_id: 1, acc: 1045},
+ {_id: 2, acc: 2045},
+ {_id: 3, acc: 3045},
+ {_id: 4, acc: 4045}
+ ]
+ });
+
+ testAccumulator({
+ accumulator: {$avg: "$b"},
+ sortInputBy: "random",
+ expectedOutput: [
+ {_id: 0, acc: 4.5},
+ {_id: 1, acc: 104.5},
+ {_id: 2, acc: 204.5},
+ {_id: 3, acc: 304.5},
+ {_id: 4, acc: 404.5}
+ ]
+ });
+
+ testAccumulator({
+ accumulator: {$addToSet: "$c"},
+ sortInputBy: "random",
+ expectedOutput: [
+ {_id: 0, acc: [0, 1, 2, 3, 4]},
+ {_id: 1, acc: [100, 101, 102, 103, 104]},
+ {_id: 2, acc: [200, 201, 202, 203, 204]},
+ {_id: 3, acc: [300, 301, 302, 303, 304]},
+ {_id: 4, acc: [400, 401, 402, 403, 404]},
+ ],
+ // Since the accumulator produces sets, the resulting arrays may be in any order.
+ ignoreArrayOrder: true,
+ });
+
+ testAccumulator({
+ accumulator: {$push: "$c"},
+ sortInputBy: "_id",
+ expectedOutput: [
+ {_id: 0, acc: [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]},
+ {_id: 1, acc: [100, 101, 102, 103, 104, 100, 101, 102, 103, 104]},
+ {_id: 2, acc: [200, 201, 202, 203, 204, 200, 201, 202, 203, 204]},
+ {_id: 3, acc: [300, 301, 302, 303, 304, 300, 301, 302, 303, 304]},
+ {_id: 4, acc: [400, 401, 402, 403, 404, 400, 401, 402, 403, 404]},
+ ],
+ });
+
+ testAccumulator({
+ accumulator: {$mergeObjects: "$obj"},
+ sortInputBy: "_id",
+ expectedOutput: [
+ {_id: 0, acc: {a: 0, b: 9}},
+ {_id: 1, acc: {a: 1, b: 9}},
+ {_id: 2, acc: {a: 2, b: 9}},
+ {_id: 3, acc: {a: 3, b: 9}},
+ {_id: 4, acc: {a: 4, b: 9}}
+ ],
+ });
+}
+
+(function() {
+const kMemLimit = 100;
+let oldMemSettings = setHashGroupMemoryParameters(kMemLimit);
+try {
+ testSpillingForVariousAccumulators();
+} finally {
+ setHashGroupMemoryParameters(oldMemSettings);
+}
+})();
+
+assert(coll.drop());
+
// Test spill to disk for $lookup
const localColl = db.lookup_spill_local_hj;
const foreignColl = db.lookup_spill_foreign_hj;
@@ -223,16 +416,8 @@ function setupCollections(localRecords, foreignRecords, foreignField) {
}
function setHashLookupParameters(memoryLimit) {
- const commandResArr = FixtureHelpers.runCommandOnEachPrimary({
- db: db.getSiblingDB("admin"),
- cmdObj: {
- setParameter: 1,
- internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: memoryLimit,
- }
- });
- assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed.");
- const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was;
- return oldMemoryLimit;
+ return setMemoryParamHelper(
+ "internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill", memoryLimit);
}
/**
@@ -260,7 +445,7 @@ function runTest_MultipleLocalForeignRecords({
const results = localColl.aggregate(pipeline, {allowDiskUse: true}).toArray();
const explain = localColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});
// If sharding is enabled, '$lookup' is not pushed down to SBE.
- if (isSBELookupEnabled && !sharded) {
+ if (isSbeEnabled && !sharded) {
const hLookups = getSbePlanStages(explain, 'hash_lookup');
assert.eq(hLookups.length, 1, explain);
const hLookup = hLookups[0];