diff options
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/aggregation/spill_to_disk.js | 223 | ||||
-rw-r--r-- | jstests/noPassthrough/spill_to_disk_secondary_read.js | 6 |
2 files changed, 208 insertions, 21 deletions
diff --git a/jstests/aggregation/spill_to_disk.js b/jstests/aggregation/spill_to_disk.js index 290df334e60..568d973e988 100644 --- a/jstests/aggregation/spill_to_disk.js +++ b/jstests/aggregation/spill_to_disk.js @@ -24,11 +24,37 @@ load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. const coll = db.spill_to_disk; coll.drop(); +// Sets the set parameter named 'paramName' to the given 'memoryLimit' on each primary node in the +// cluster, and returns the old value. +function setMemoryParamHelper(paramName, memoryLimit) { + const commandResArr = FixtureHelpers.runCommandOnEachPrimary({ + db: db.getSiblingDB("admin"), + cmdObj: { + setParameter: 1, + [paramName]: memoryLimit, + } + }); + assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed"); + const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was; + return oldMemoryLimit; +} + +// Verifies that the given 'groupStats' (an extract from SBE "executionStats" explain output) show +// evidence of spilling to disk. +function assertSpillingOccurredInSbeExplain(groupStats) { + assert(groupStats); + assert(groupStats.hasOwnProperty("usedDisk"), groupStats); + assert(groupStats.usedDisk, groupStats); + assert.gt(groupStats.numSpills, 0, groupStats); + assert.gt(groupStats.spilledRecords, 0, groupStats); + assert.gt(groupStats.spilledDataStorageSize, 0, groupStats); +} + const sharded = FixtureHelpers.isSharded(coll); const memoryLimitMB = sharded ? 200 : 100; -const isSBELookupEnabled = checkSBEEnabled(db); +const isSbeEnabled = checkSBEEnabled(db); const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ',' for (let i = 0; i < memoryLimitMB + 1; i++) @@ -51,7 +77,7 @@ function test({pipeline, expectedCodes, canSpillToDisk}) { assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(), coll.count()); // all tests output one doc per input doc - if (isSBELookupEnabled) { + if (isSbeEnabled) { const explain = db.runCommand({ explain: {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true} @@ -60,12 +86,7 @@ function test({pipeline, expectedCodes, canSpillToDisk}) { if (hashAggGroups.length > 0) { assert.eq(hashAggGroups.length, 1, explain); const hashAggGroup = hashAggGroups[0]; - assert(hashAggGroup, explain); - assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup); - assert(hashAggGroup.usedDisk, hashAggGroup); - assert.gt(hashAggGroup.spilledRecords, 0, hashAggGroup); - assert.gt(hashAggGroup.spilledBytesApprox, 0, hashAggGroup); - assert.gt(hashAggGroup.spilledRecordEstimatedStorageSize, 0, hashAggGroup); + assertSpillingOccurredInSbeExplain(hashAggGroup); } } } else { @@ -211,6 +232,178 @@ for (const op of ['$firstN', '$lastN', '$minN', '$maxN', '$topN', '$bottomN']) { // don't leave large collection laying around assert(coll.drop()); +// Test spilling to disk for various accumulators in a $group stage . The data has 5 groups of 10 +// documents each. We configure a low memory limit for SBE's hash aggregation stage in order to +// encourage spilling. +const numGroups = 5; +const docsPerGroup = 10; +let counter = 0; +for (let i = 0; i < numGroups; ++i) { + for (let j = 0; j < docsPerGroup; ++j) { + const doc = { + _id: counter++, + a: i, + b: 100 * i + j, + c: 100 * i + j % 5, + obj: {a: i, b: j}, + random: Math.random() + }; + assert.commandWorked(coll.insert(doc)); + } +} + +function setHashGroupMemoryParameters(memoryLimit) { + return setMemoryParamHelper( + "internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill", memoryLimit); +} + +// Runs a group query containing the given 'accumulator' after sorting the data by the given +// 'sortInputBy' field. Then verifies that the query results are equal to 'expectedOutput'. If SBE +// is enabled, also runs explain and checks that the execution stats show that spilling occurred. +function testAccumulator({accumulator, sortInputBy, expectedOutput, ignoreArrayOrder = false}) { + const pipeline = + [{$sort: {[sortInputBy]: 1}}, {$group: {_id: "$a", acc: accumulator}}, {$sort: {_id: 1}}]; + const results = coll.aggregate(pipeline).toArray(); + + if (ignoreArrayOrder) { + assert(arrayEq(results, expectedOutput)); + } else { + assert.eq(results, expectedOutput); + } + + if (isSbeEnabled) { + const explain = coll.explain("executionStats").aggregate(pipeline); + const groupStages = getSbePlanStages(explain, "group"); + assert.eq(groupStages.length, 1, groupStages); + assertSpillingOccurredInSbeExplain(groupStages[0]); + } +} + +function testSpillingForVariousAccumulators() { + testAccumulator({ + accumulator: {$first: "$b"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: 0}, + {_id: 1, acc: 100}, + {_id: 2, acc: 200}, + {_id: 3, acc: 300}, + {_id: 4, acc: 400} + ] + + }); + + testAccumulator({ + accumulator: {$last: "$b"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: 9}, + {_id: 1, acc: 109}, + {_id: 2, acc: 209}, + {_id: 3, acc: 309}, + {_id: 4, acc: 409} + ] + }); + + testAccumulator({ + accumulator: {$min: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 0}, + {_id: 1, acc: 100}, + {_id: 2, acc: 200}, + {_id: 3, acc: 300}, + {_id: 4, acc: 400} + ] + }); + + testAccumulator({ + accumulator: {$max: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 9}, + {_id: 1, acc: 109}, + {_id: 2, acc: 209}, + {_id: 3, acc: 309}, + {_id: 4, acc: 409} + ] + }); + + testAccumulator({ + accumulator: {$sum: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 45}, + {_id: 1, acc: 1045}, + {_id: 2, acc: 2045}, + {_id: 3, acc: 3045}, + {_id: 4, acc: 4045} + ] + }); + + testAccumulator({ + accumulator: {$avg: "$b"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: 4.5}, + {_id: 1, acc: 104.5}, + {_id: 2, acc: 204.5}, + {_id: 3, acc: 304.5}, + {_id: 4, acc: 404.5} + ] + }); + + testAccumulator({ + accumulator: {$addToSet: "$c"}, + sortInputBy: "random", + expectedOutput: [ + {_id: 0, acc: [0, 1, 2, 3, 4]}, + {_id: 1, acc: [100, 101, 102, 103, 104]}, + {_id: 2, acc: [200, 201, 202, 203, 204]}, + {_id: 3, acc: [300, 301, 302, 303, 304]}, + {_id: 4, acc: [400, 401, 402, 403, 404]}, + ], + // Since the accumulator produces sets, the resulting arrays may be in any order. + ignoreArrayOrder: true, + }); + + testAccumulator({ + accumulator: {$push: "$c"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]}, + {_id: 1, acc: [100, 101, 102, 103, 104, 100, 101, 102, 103, 104]}, + {_id: 2, acc: [200, 201, 202, 203, 204, 200, 201, 202, 203, 204]}, + {_id: 3, acc: [300, 301, 302, 303, 304, 300, 301, 302, 303, 304]}, + {_id: 4, acc: [400, 401, 402, 403, 404, 400, 401, 402, 403, 404]}, + ], + }); + + testAccumulator({ + accumulator: {$mergeObjects: "$obj"}, + sortInputBy: "_id", + expectedOutput: [ + {_id: 0, acc: {a: 0, b: 9}}, + {_id: 1, acc: {a: 1, b: 9}}, + {_id: 2, acc: {a: 2, b: 9}}, + {_id: 3, acc: {a: 3, b: 9}}, + {_id: 4, acc: {a: 4, b: 9}} + ], + }); +} + +(function() { +const kMemLimit = 100; +let oldMemSettings = setHashGroupMemoryParameters(kMemLimit); +try { + testSpillingForVariousAccumulators(); +} finally { + setHashGroupMemoryParameters(oldMemSettings); +} +})(); + +assert(coll.drop()); + // Test spill to disk for $lookup const localColl = db.lookup_spill_local_hj; const foreignColl = db.lookup_spill_foreign_hj; @@ -223,16 +416,8 @@ function setupCollections(localRecords, foreignRecords, foreignField) { } function setHashLookupParameters(memoryLimit) { - const commandResArr = FixtureHelpers.runCommandOnEachPrimary({ - db: db.getSiblingDB("admin"), - cmdObj: { - setParameter: 1, - internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: memoryLimit, - } - }); - assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed."); - const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was; - return oldMemoryLimit; + return setMemoryParamHelper( + "internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill", memoryLimit); } /** @@ -260,7 +445,7 @@ function runTest_MultipleLocalForeignRecords({ const results = localColl.aggregate(pipeline, {allowDiskUse: true}).toArray(); const explain = localColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true}); // If sharding is enabled, '$lookup' is not pushed down to SBE. - if (isSBELookupEnabled && !sharded) { + if (isSbeEnabled && !sharded) { const hLookups = getSbePlanStages(explain, 'hash_lookup'); assert.eq(hLookups.length, 1, explain); const hLookup = hLookups[0]; diff --git a/jstests/noPassthrough/spill_to_disk_secondary_read.js b/jstests/noPassthrough/spill_to_disk_secondary_read.js index 217ce510eef..8ab9178b4c5 100644 --- a/jstests/noPassthrough/spill_to_disk_secondary_read.js +++ b/jstests/noPassthrough/spill_to_disk_secondary_read.js @@ -91,8 +91,10 @@ const readColl = secondary.getDB("test").foo; assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup); assert(hashAggGroup.usedDisk, hashAggGroup); assert.eq(hashAggGroup.spilledRecords, expectedSpilledRecords, hashAggGroup); - assert.gte(hashAggGroup.spilledBytesApprox, expectedSpilledBytesAtLeast, hashAggGroup); - assert.gt(hashAggGroup.spilledRecordEstimatedStorageSize, 0, hashAggGroup); + // We expect each record to be individually spilled, so the number of spill events and the + // number of spilled records should be equal. + assert.eq(hashAggGroup.numSpills, hashAggGroup.spilledRecords, hashAggGroup); + assert.gt(hashAggGroup.spilledDataStorageSize, expectedSpilledBytesAtLeast, hashAggGroup); } finally { assert.commandWorked(secondary.adminCommand({ setParameter: 1, |