diff options
author | David Storch <david.storch@mongodb.com> | 2023-01-30 22:54:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-31 01:27:54 +0000 |
commit | 355aadd8ff9d5599da2983990f87c7ec300a972d (patch) | |
tree | b75d91aea6c48c0e47edcff1c3a9ededde8c3354 /jstests/aggregation | |
parent | 40c93f028e36f78c06756f4bfd358d240bdd9b34 (diff) | |
download | mongo-355aadd8ff9d5599da2983990f87c7ec300a972d.tar.gz |
SERVER-70395 Change spilling for SBE HashAggStage to use a more efficient algorithm
The new algorithm spills the entire hash table to a
RecordStore whenever the memory budget is exceeded. Once all
the input is consumed, it switches to a streaming approach,
merging the partial aggregates recovered from disk.
Diffstat (limited to 'jstests/aggregation')
-rw-r--r-- | jstests/aggregation/spill_to_disk.js | 223 |
1 files changed, 204 insertions, 19 deletions
diff --git a/jstests/aggregation/spill_to_disk.js b/jstests/aggregation/spill_to_disk.js index 290df334e60..568d973e988 100644 --- a/jstests/aggregation/spill_to_disk.js +++ b/jstests/aggregation/spill_to_disk.js @@ -24,11 +24,37 @@ load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. const coll = db.spill_to_disk; coll.drop(); +// Sets the set parameter named 'paramName' to the given 'memoryLimit' on each primary node in the +// cluster, and returns the old value. +function setMemoryParamHelper(paramName, memoryLimit) { + const commandResArr = FixtureHelpers.runCommandOnEachPrimary({ + db: db.getSiblingDB("admin"), + cmdObj: { + setParameter: 1, + [paramName]: memoryLimit, + } + }); + assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed"); + const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was; + return oldMemoryLimit; +} + +// Verifies that the given 'groupStats' (an extract from SBE "executionStats" explain output) show +// evidence of spilling to disk. +function assertSpillingOccurredInSbeExplain(groupStats) { + assert(groupStats); + assert(groupStats.hasOwnProperty("usedDisk"), groupStats); + assert(groupStats.usedDisk, groupStats); + assert.gt(groupStats.numSpills, 0, groupStats); + assert.gt(groupStats.spilledRecords, 0, groupStats); + assert.gt(groupStats.spilledDataStorageSize, 0, groupStats); +} + const sharded = FixtureHelpers.isSharded(coll); const memoryLimitMB = sharded ? 200 : 100; -const isSBELookupEnabled = checkSBEEnabled(db); +const isSbeEnabled = checkSBEEnabled(db); const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ',' for (let i = 0; i < memoryLimitMB + 1; i++) @@ -51,7 +77,7 @@ function test({pipeline, expectedCodes, canSpillToDisk}) { assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(), coll.count()); // all tests output one doc per input doc - if (isSBELookupEnabled) { + if (isSbeEnabled) { const explain = db.runCommand({ explain: {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true} @@ -60,12 +86,7 @@ function test({pipeline, expectedCodes, canSpillToDisk}) { if (hashAggGroups.length > 0) { assert.eq(hashAggGroups.length, 1, explain); const hashAggGroup = hashAggGroups[0]; - assert(hashAggGroup, explain); - assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup); - assert(hashAggGroup.usedDisk, hashAggGroup); - assert.gt(hashAggGroup.spilledRecords, 0, hashAggGroup); - assert.gt(hashAggGroup.spilledBytesApprox, 0, hashAggGroup); - assert.gt(hashAggGroup.spilledRecordEstimatedStorageSize, 0, hashAggGroup); + assertSpillingOccurredInSbeExplain(hashAggGroup); } } } else { @@ -211,6 +232,178 @@ for (const op of ['$firstN', '$lastN', '$minN', '$maxN', '$topN', '$bottomN']) { // don't leave large collection laying around assert(coll.drop()); +// Test spilling to disk for various accumulators in a $group stage . The data has 5 groups of 10 +// documents each. We configure a low memory limit for SBE's hash aggregation stage in order to +// encourage spilling. +const numGroups = 5; +const docsPerGroup = 10; +let counter = 0; +for (let i = 0; i < numGroups; ++i) { + for (let j = 0; j < docsPerGroup; ++j) { + const doc = { + _id: counter++, + a: i, + b: 100 * i + j, + c: 100 * i + j % 5, + obj: {a: i, b: j}, + random: Math.random() + }; + assert.commandWorked(coll.insert(doc)); + } +} + +function setHashGroupMemoryParameters(memoryLimit) { + return setMemoryParamHelper( + "internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill", memoryLimit); +} + +// Runs a group query containing the given 'accumulator' after sorting the data by the given +// 'sortInputBy' field. Then verifies that the query results are equal to 'expectedOutput'. If SBE +// is enabled, also runs explain and checks that the execution stats show that spilling occurred. +function testAccumulator({accumulator, sortInputBy, expectedOutput, ignoreArrayOrder = false}) { + const pipeline = + [{$sort: {[sortInputBy]: 1}}, {$group: {_id: "$a", acc: accumulator}}, {$sort: {_id: 1}}]; + const results = coll.aggregate(pipeline).toArray(); + + if (ignoreArrayOrder) { + assert(arrayEq(results, expectedOutput)); + } else { + assert.eq(results, expectedOutput); + } + + if (isSbeEnabled) { + const explain = coll.explain("executionStats").aggregate(pipeline); + const groupStages = getSbePlanStages(explain, "group"); + assert.eq(groupStages.length, 1, groupStages); + assertSpillingOccurredInSbeExplain(groupStages[0]); + } +} + +function testSpillingForVariousAccumulators() { + testAccumulator({ + accumulator: {$first: "$b"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: 0}, + {_id: 1, acc: 100}, + {_id: 2, acc: 200}, + {_id: 3, acc: 300}, + {_id: 4, acc: 400} + ] + + }); + + testAccumulator({ + accumulator: {$last: "$b"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: 9}, + {_id: 1, acc: 109}, + {_id: 2, acc: 209}, + {_id: 3, acc: 309}, + {_id: 4, acc: 409} + ] + }); + + testAccumulator({ + accumulator: {$min: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 0}, + {_id: 1, acc: 100}, + {_id: 2, acc: 200}, + {_id: 3, acc: 300}, + {_id: 4, acc: 400} + ] + }); + + testAccumulator({ + accumulator: {$max: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 9}, + {_id: 1, acc: 109}, + {_id: 2, acc: 209}, + {_id: 3, acc: 309}, + {_id: 4, acc: 409} + ] + }); + + testAccumulator({ + accumulator: {$sum: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 45}, + {_id: 1, acc: 1045}, + {_id: 2, acc: 2045}, + {_id: 3, acc: 3045}, + {_id: 4, acc: 4045} + ] + }); + + testAccumulator({ + accumulator: {$avg: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 4.5}, + {_id: 1, acc: 104.5}, + {_id: 2, acc: 204.5}, + {_id: 3, acc: 304.5}, + {_id: 4, acc: 404.5} + ] + }); + + testAccumulator({ + accumulator: {$addToSet: "$c"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: [0, 1, 2, 3, 4]}, + {_id: 1, acc: [100, 101, 102, 103, 104]}, + {_id: 2, acc: [200, 201, 202, 203, 204]}, + {_id: 3, acc: [300, 301, 302, 303, 304]}, + {_id: 4, acc: [400, 401, 402, 403, 404]}, + ], + // Since the accumulator produces sets, the resulting arrays may be in any order. + ignoreArrayOrder: true, + }); + + testAccumulator({ + accumulator: {$push: "$c"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]}, + {_id: 1, acc: [100, 101, 102, 103, 104, 100, 101, 102, 103, 104]}, + {_id: 2, acc: [200, 201, 202, 203, 204, 200, 201, 202, 203, 204]}, + {_id: 3, acc: [300, 301, 302, 303, 304, 300, 301, 302, 303, 304]}, + {_id: 4, acc: [400, 401, 402, 403, 404, 400, 401, 402, 403, 404]}, + ], + }); + + testAccumulator({ + accumulator: {$mergeObjects: "$obj"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: {a: 0, b: 9}}, + {_id: 1, acc: {a: 1, b: 9}}, + {_id: 2, acc: {a: 2, b: 9}}, + {_id: 3, acc: {a: 3, b: 9}}, + {_id: 4, acc: {a: 4, b: 9}} + ], + }); +} + +(function() { +const kMemLimit = 100; +let oldMemSettings = setHashGroupMemoryParameters(kMemLimit); +try { + testSpillingForVariousAccumulators(); +} finally { + setHashGroupMemoryParameters(oldMemSettings); +} +})(); + +assert(coll.drop()); + // Test spill to disk for $lookup const localColl = db.lookup_spill_local_hj; const foreignColl = db.lookup_spill_foreign_hj; @@ -223,16 +416,8 @@ function setupCollections(localRecords, foreignRecords, foreignField) { } function setHashLookupParameters(memoryLimit) { - const commandResArr = FixtureHelpers.runCommandOnEachPrimary({ - db: db.getSiblingDB("admin"), - cmdObj: { - setParameter: 1, - internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: memoryLimit, - } - }); - assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed."); - const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was; - return oldMemoryLimit; + return setMemoryParamHelper( + "internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill", memoryLimit); } /** @@ -260,7 +445,7 @@ function runTest_MultipleLocalForeignRecords({ const results = localColl.aggregate(pipeline, {allowDiskUse: true}).toArray(); const explain = localColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true}); // If sharding is enabled, '$lookup' is not pushed down to SBE. - if (isSBELookupEnabled && !sharded) { + if (isSbeEnabled && !sharded) { const hLookups = getSbePlanStages(explain, 'hash_lookup'); assert.eq(hLookups.length, 1, explain); const hLookup = hLookups[0]; |