diff options
author | Irina Yatsenko <irina.yatsenko@mongodb.com> | 2022-04-14 20:07:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-14 21:26:28 +0000 |
commit | 668b505cbdeced9e6de7ff1ed3dcf76068ae375d (patch) | |
tree | 6ff1e15c3763ec6b6f0a53e09e27d2276f3b6c21 /jstests/noPassthrough | |
parent | 37deca965953bf21b38d5e3572e46e102b3badc6 (diff) | |
download | mongo-668b505cbdeced9e6de7ff1ed3dcf76068ae375d.tar.gz |
SERVER-65265 Track memory for unbounded accumulators in SBE
Diffstat (limited to 'jstests/noPassthrough')
3 files changed, 257 insertions, 153 deletions
diff --git a/jstests/noPassthrough/agg_configurable_memory_limits.js b/jstests/noPassthrough/agg_configurable_memory_limits.js index 5a66fb0f018..6ccb3e1434e 100644 --- a/jstests/noPassthrough/agg_configurable_memory_limits.js +++ b/jstests/noPassthrough/agg_configurable_memory_limits.js @@ -7,60 +7,80 @@ assert.neq(null, conn, "mongod was unable to start up"); const db = conn.getDB("test"); const coll = db.agg_configurable_memory_limit; +// The approximate size of the strings below is 22-25 bytes, so configure one memory limit such that +// 100 of these strings will surely exceed it but 24 of them won't, and another such that 24 will +// exceed as well. +const stringSize = 22; +const nSetBaseline = 20; +const memLimitArray = nSetBaseline * 5 * stringSize / 2; +const memLimitSet = (nSetBaseline + 4) * stringSize / 2; + const bulk = coll.initializeUnorderedBulkOp(); -for (let i = 0; i < 100; i++) { - bulk.insert({_id: i, x: i, y: ["string 1", "string 2", "string 3", "string 4", "string " + i]}); +for (let i = 0; i < nSetBaseline; i++) { + bulk.insert({_id: 5 * i + 0, y: "string 0"}); + bulk.insert({_id: 5 * i + 1, y: "string 1"}); + bulk.insert({_id: 5 * i + 2, y: "string 2"}); + bulk.insert({_id: 5 * i + 3, y: "string 3"}); + bulk.insert({_id: 5 * i + 4, y: "string " + 5 * i + 4}); } assert.commandWorked(bulk.execute()); -// Test that pushing a bunch of strings to an array does not exceed the default 100MB memory limit. -assert.doesNotThrow( - () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$push: "$y"}}}])); +(function testInternalQueryMaxPushBytesSetting() { + // Test that the default 100MB memory limit isn't reached with our data. + assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {$push: "$y"}}}])); -// Now lower the limit to test that it's configuration is obeyed. -assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryMaxPushBytes: 100})); -assert.throwsWithCode( - () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$push: "$y"}}}]), - ErrorCodes.ExceededMemoryLimit); + // Now lower the limit to test that it's configuration is obeyed. + assert.commandWorked( + db.adminCommand({setParameter: 1, internalQueryMaxPushBytes: memLimitArray})); + assert.throwsWithCode(() => coll.aggregate([{$group: {_id: null, strings: {$push: "$y"}}}]), + ErrorCodes.ExceededMemoryLimit); +}()); -// Test that using $addToSet behaves similarly. -assert.doesNotThrow( - () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$addToSet: "$y"}}}])); +(function testInternalQueryMaxAddToSetBytesSetting() { + // Test that the default 100MB memory limit isn't reached with our data. + assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {$addToSet: "$y"}}}])); -assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryMaxAddToSetBytes: 100})); -assert.throwsWithCode( - () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$addToSet: "$y"}}}]), - ErrorCodes.ExceededMemoryLimit); + // Test that $addToSet needs a tighter limit than $push (because some of the strings are the + // same). + assert.commandWorked( + db.adminCommand({setParameter: 1, internalQueryMaxAddToSetBytes: memLimitArray})); + assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {$addToSet: "$y"}}}])); -// Capture the default value of 'internalQueryTopNAccumulatorBytes' to reset in between runs. -const res = - assert.commandWorked(db.adminCommand({getParameter: 1, internalQueryTopNAccumulatorBytes: 1})); -const topNDefault = res["internalQueryTopNAccumulatorBytes"]; + assert.commandWorked( + db.adminCommand({setParameter: 1, internalQueryMaxAddToSetBytes: memLimitSet})); + assert.throwsWithCode(() => coll.aggregate([{$group: {_id: null, strings: {$addToSet: "$y"}}}]), + ErrorCodes.ExceededMemoryLimit); +}()); -// Test that the 'n' family of accumulators behaves similarly. -for (const op of ["$firstN", "$lastN", "$minN", "$maxN", "$topN", "$bottomN"]) { - let spec = {n: 200}; +(function testInternalQueryTopNAccumulatorBytesSetting() { + // Capture the default value of 'internalQueryTopNAccumulatorBytes' to reset in between runs. + const res = assert.commandWorked( + db.adminCommand({getParameter: 1, internalQueryTopNAccumulatorBytes: 1})); + const topNDefault = res["internalQueryTopNAccumulatorBytes"]; - // $topN/$bottomN both require a sort specification. - if (op === "$topN" || op === "$bottomN") { - spec["sortBy"] = {y: 1}; - spec["output"] = "$y"; - } else { - // $firstN/$lastN/$minN/$maxN accept 'input'. - spec["input"] = "$y"; - } + // Test that the 'n' family of accumulators behaves similarly. + for (const op of ["$firstN", "$lastN", "$minN", "$maxN", "$topN", "$bottomN"]) { + let spec = {n: 200}; - // First, verify that the accumulator doesn't throw. - db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: topNDefault}); - assert.doesNotThrow( - () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {[op]: spec}}}])); + // $topN/$bottomN both require a sort specification. + if (op === "$topN" || op === "$bottomN") { + spec["sortBy"] = {y: 1}; + spec["output"] = "$y"; + } else { + // $firstN/$lastN/$minN/$maxN accept 'input'. + spec["input"] = "$y"; + } - // Then, verify that the memory limit throws when lowered. - db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: 100}); - assert.throwsWithCode( - () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {[op]: spec}}}]), - ErrorCodes.ExceededMemoryLimit); -} + // First, verify that the accumulator doesn't throw. + db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: topNDefault}); + assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {[op]: spec}}}])); + + // Then, verify that the memory limit throws when lowered. + db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: 100}); + assert.throwsWithCode(() => coll.aggregate([{$group: {_id: null, strings: {[op]: spec}}}]), + ErrorCodes.ExceededMemoryLimit); + } +}()); MongoRunner.stopMongod(conn); }()); diff --git a/jstests/noPassthrough/lookup_max_intermediate_size.js b/jstests/noPassthrough/lookup_max_intermediate_size.js index d3a53039fff..b9534b29d38 100644 --- a/jstests/noPassthrough/lookup_max_intermediate_size.js +++ b/jstests/noPassthrough/lookup_max_intermediate_size.js @@ -27,7 +27,7 @@ function testPipeline(pipeline, expectedResult, collection) { expectedResult.sort(compareId)); } -function runTest(coll, from) { +function runTest(coll, from, expectedErrorCode) { const db = null; // Using the db variable is banned in this function. from.drop(); @@ -81,29 +81,28 @@ function runTest(coll, from) { {$project: {_id: 1}} ]; - assertErrorCode(coll, pipeline, 4568); + assertErrorCode(coll, pipeline, expectedErrorCode); } -// Run tests on single node. +/** + * Run tests on single node. + */ const standalone = MongoRunner.runMongod(); const db = standalone.getDB("test"); -// TODO SERVER-65265 Remove 'if' block below. -if (checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) { - jsTest.log("Skipping test because SBE and SBE $lookup features are both enabled."); - MongoRunner.stopMongod(standalone); - return; -} - assert.commandWorked(db.adminCommand( {setParameter: 1, internalLookupStageIntermediateDocumentMaxSizeBytes: 30 * 1024 * 1024})); db.lookUp.drop(); -runTest(db.lookUp, db.from); +const expectedErrorCode = + (checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) ? ErrorCodes.ExceededMemoryLimit : 4568; +runTest(db.lookUp, db.from, expectedErrorCode); MongoRunner.stopMongod(standalone); -// Run tests in a sharded environment. +/** + * Run tests in a sharded environment. + */ const sharded = new ShardingTest({ mongos: 1, shards: 2, @@ -117,7 +116,10 @@ assert(sharded.adminCommand({enableSharding: "test"})); sharded.getDB('test').lookUp.drop(); assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}})); -runTest(sharded.getDB('test').lookUp, sharded.getDB('test').from); + +// If foreign collection is sharded, $lookup isn't lowered into SBE, so the memory limit error will +// be coming from the classical aggregation pipeline. +runTest(sharded.getDB('test').lookUp, sharded.getDB('test').from, 4568); sharded.stop(); }()); diff --git a/jstests/noPassthrough/spill_to_disk_secondary_read.js b/jstests/noPassthrough/spill_to_disk_secondary_read.js index 1a3c0702da5..f4c2ef20104 100644 --- a/jstests/noPassthrough/spill_to_disk_secondary_read.js +++ b/jstests/noPassthrough/spill_to_disk_secondary_read.js @@ -16,113 +16,195 @@ const replTest = new ReplSetTest({ replTest.startSet(); replTest.initiate(); -// Test that spilling '$group' and '$lookup' pipeline on a secondary works with a writeConcern -// greater than w:1. +/** + * Setup the primary and secondary collections. + */ let primary = replTest.getPrimary(); const insertColl = primary.getDB("test").foo; -for (let i = 0; i < 500; ++i) { - assert.commandWorked(insertColl.insert({a: i, string: "test test test"})); +const cRecords = 50; +for (let i = 0; i < cRecords; ++i) { + // We'll be using a unique 'key' field for group & lookup, but we cannot use '_id' for this, + // because '_id' is indexed and would trigger Indexed Loop Join instead of Hash Join. + assert.commandWorked(insertColl.insert({key: i, string: "test test test"})); } let secondary = replTest.getSecondary(); -assert.commandWorked(secondary.adminCommand( - {setParameter: 1, internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: 1})); - -assert.commandWorked(secondary.adminCommand({ - setParameter: 1, - internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1 -})); - -const isSBELookupEnabled = - checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBELookupPushdown"]); - const readColl = secondary.getDB("test").foo; -let pipeline = [{$group: {_id: '$a', s: {$addToSet: '$string'}, p: {$push: '$a'}}}]; - -if (isSBELookupEnabled) { - let explainRes = readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true}); - const hashAggGroups = getSbePlanStages(explainRes, 'group'); - assert.eq(hashAggGroups.length, 1, explainRes); - const hashAggGroup = hashAggGroups[0]; - assert(hashAggGroup, explainRes); - assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup); - assert(hashAggGroup.usedDisk, hashAggGroup); - assert.eq(hashAggGroup.spilledRecords, 500, hashAggGroup); - assert.eq(hashAggGroup.spilledBytesApprox, 27500, hashAggGroup); -} - -let res = - readColl - .aggregate( - pipeline, - {allowDiskUse: true, readConcern: {level: "majority"}, writeConcern: {"w": "majority"}}) - .toArray(); - -pipeline = - [{$lookup: {from: readColl.getName(), localField: "a", foreignField: "a", as: "results"}}]; - -if (isSBELookupEnabled) { - let explainRes = readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true}); - const hLookups = getSbePlanStages(explainRes, 'hash_lookup'); - assert.eq(hLookups.length, 1, explainRes); - const hLookup = hLookups[0]; - assert(hLookup, explainRes); - assert(hLookup.hasOwnProperty("usedDisk"), hLookup); - assert(hLookup.usedDisk, hLookup); - assert.eq(hLookup.spilledRecords, 1000, hLookup); - assert.eq(hLookup.spilledBytesApprox, 63000, hLookup); -} +/** + * Test spilling of $group, when explicitly run on a secondary. + */ +(function testGroupSpilling() { + if (!checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBEGroupPushdown"])) { + jsTestLog("Skipping test for HashAgg stage: $group lowering into SBE isn't enabled"); + return; + } + + // Set memory limit so low that HashAgg has to spill all records it processes. + const oldSetting = + assert + .commandWorked(secondary.adminCommand({ + setParameter: 1, + internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: 1 + })) + .was; + try { + // The pipeline is silly -- because '$key' contains unique values, it will "group" exactly + // one record into each bucket and push a single '$string' value -- but it allows us to be + // more deterministic with spilling behaviour: each input record would create a record + // inside 'HashAgg' and because of the low memory limit, all of them will have to be + // spilled. For the spilled bytes, sanity test that the number is "reasonably large". + const pipeline = [{$group: {_id: '$key', s: {$push: '$string'}}}]; + const expectedSpilledRecords = cRecords; + const expectedSpilledBytesAtLeast = cRecords * 10; + + // Sanity check that the operation fails if cannot use disk and is successful otherwise. + const aggCommand = { + pipeline: pipeline, + allowDiskUse: false, + readConcern: {level: "majority"}, + writeConcern: {"w": "majority"}, + cursor: {} + }; + assert.commandFailedWithCode(readColl.runCommand("aggregate", aggCommand), + ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed); + let aggOptions = { + allowDiskUse: true, + readConcern: {level: "majority"}, + writeConcern: {"w": "majority"} + }; + const res = readColl.aggregate(pipeline, aggOptions).toArray(); + assert.eq(res.length, cRecords); // the group-by key is unique + + // In SBE also check the statistics for disk usage. Note: 'explain()' doesn't support the + // 'writeConcern' option so we test spilling on the secondary but without using the concern. + const explainRes = + readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true}); + const hashAggGroups = getSbePlanStages(explainRes, 'group'); + assert.eq(hashAggGroups.length, 1, explainRes); + const hashAggGroup = hashAggGroups[0]; + assert(hashAggGroup, explainRes); + assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup); + assert(hashAggGroup.usedDisk, hashAggGroup); + assert.eq(hashAggGroup.spilledRecords, expectedSpilledRecords, hashAggGroup); + assert.gte(hashAggGroup.spilledBytesApprox, expectedSpilledBytesAtLeast, hashAggGroup); + } finally { + assert.commandWorked(secondary.adminCommand({ + setParameter: 1, + internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: oldSetting + })); + } +})(); -res = - readColl - .aggregate( - pipeline, - {allowDiskUse: true, readConcern: {level: "majority"}, writeConcern: {"w": "majority"}}) - .toArray(); - -insertColl.drop(); - -// Test that spilling '$setWindowFields' pipeline on a secondary works with a writeConcern greater -// than w:1. -let avgDocSize = 274; -let smallPartitionSize = 6; -let largePartitionSize = 21; -const insertCollWFs = primary.getDB("test").bar; - -// Create small partition. -for (let i = 0; i < smallPartitionSize; i++) { - assert.commandWorked(insertCollWFs.insert({_id: i, val: i, partition: 1})); -} -// Create large partition. -for (let i = 0; i < largePartitionSize; i++) { - assert.commandWorked(insertCollWFs.insert({_id: i + smallPartitionSize, val: i, partition: 2})); -} +/** + * Test spilling of $lookup when explicitly run on a secondary. + */ +(function testLookupSpillingInSbe() { + if (!checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBELookupPushdown"])) { + jsTestLog("Skipping test for HashLookup stage: $lookup lowering into SBE isn't enabled"); + return; + } + + // Set memory limit so low that HashLookup has to spill all records it processes. + const oldSetting = + assert + .commandWorked(secondary.adminCommand({ + setParameter: 1, + internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1 + })) + .was; + try { + // The pipeline is silly -- because '$key' contains unique values, it will self-join each + // record with itself and nothing else -- but it allows us to be more deterministic with + // the spilling behaviour. For the spilled bytes, sanity test that the number is "reasonably + // large". + const pipeline = [{ + $lookup: + {from: readColl.getName(), localField: "key", foreignField: "key", as: "results"} + }]; + const expectedSpilledRecordsAtLeast = cRecords; + const expectedSpilledBytesAtLeast = cRecords * 20; + + // Sanity check that the operation is successful. Note: we cannot test the operation to fail + // with 'allowDiskUse' set to "false" because that would block HashJoin and fall back to NLJ + // which doesn't spill. + let aggOptions = { + allowDiskUse: true, + readConcern: {level: "majority"}, + writeConcern: {"w": "majority"} + }; + const res = readColl.aggregate(pipeline, aggOptions).toArray(); + assert.eq(res.length, cRecords); // the key for self-join is unique + + // In SBE also check the statistics for disk usage. Note: 'explain()' doesn't support the + // 'writeConcern' option so we test spilling on the secondary but without using the concern. + const explainRes = + readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true}); + + const hLookups = getSbePlanStages(explainRes, 'hash_lookup'); + assert.eq(hLookups.length, 1, explainRes); + const hLookup = hLookups[0]; + assert(hLookup, explainRes); + assert(hLookup.hasOwnProperty("usedDisk"), hLookup); + assert(hLookup.usedDisk, hLookup); + assert.gte(hLookup.spilledRecords, expectedSpilledRecordsAtLeast, hLookup); + assert.gte(hLookup.spilledBytesApprox, expectedSpilledBytesAtLeast, hLookup); + } finally { + assert.commandWorked(secondary.adminCommand({ + setParameter: 1, + internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: oldSetting + })); + } +})(); -assert.commandWorked(secondary.adminCommand({ - setParameter: 1, - internalDocumentSourceSetWindowFieldsMaxMemoryBytes: largePartitionSize * avgDocSize + 1 -})); - -const readCollWFs = secondary.getDB("test").bar; - -pipeline = [ - { - $setWindowFields: { - partitionBy: "$partition", - sortBy: {partition: 1}, - output: {arr: {$push: "$val", window: {documents: [-25, 25]}}} - } - }, - {$sort: {_id: 1}} -]; - -res = - readCollWFs - .aggregate( - pipeline, - {allowDiskUse: true, readConcern: {level: "majority"}, writeConcern: {"w": "majority"}}) - .toArray(); +/** + * Test spilling of $setWindowFields. We only check that the operation is successful. Main tests for + * $setWindowFields can be found in jstests/aggregation/sources/setWindowFields/spill_to_disk.js. + */ +(function testSetWindowFields() { + // Test that spilling '$setWindowFields' pipeline on a secondary works with a writeConcern + // greater than w:1. + let avgDocSize = 274; + let smallPartitionSize = 6; + let largePartitionSize = 21; + const insertCollWFs = primary.getDB("test").bar; + + // Create small partition. + for (let i = 0; i < smallPartitionSize; i++) { + assert.commandWorked(insertCollWFs.insert({_id: i, val: i, partition: 1})); + } + // Create large partition. + for (let i = 0; i < largePartitionSize; i++) { + assert.commandWorked( + insertCollWFs.insert({_id: i + smallPartitionSize, val: i, partition: 2})); + } + + assert.commandWorked(secondary.adminCommand({ + setParameter: 1, + internalDocumentSourceSetWindowFieldsMaxMemoryBytes: largePartitionSize * avgDocSize + 1 + })); + + const readCollWFs = secondary.getDB("test").bar; + + let pipeline = [ + { + $setWindowFields: { + partitionBy: "$partition", + sortBy: {partition: 1}, + output: {arr: {$push: "$val", window: {documents: [-25, 25]}}} + } + }, + {$sort: {_id: 1}} + ]; + + let res = readCollWFs + .aggregate(pipeline, { + allowDiskUse: true, + readConcern: {level: "majority"}, + writeConcern: {"w": "majority"} + }) + .toArray(); +})(); replTest.stopSet(); })(); |