summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough
diff options
context:
space:
mode:
authorIrina Yatsenko <irina.yatsenko@mongodb.com>2022-04-14 20:07:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-14 21:26:28 +0000
commit668b505cbdeced9e6de7ff1ed3dcf76068ae375d (patch)
tree6ff1e15c3763ec6b6f0a53e09e27d2276f3b6c21 /jstests/noPassthrough
parent37deca965953bf21b38d5e3572e46e102b3badc6 (diff)
downloadmongo-668b505cbdeced9e6de7ff1ed3dcf76068ae375d.tar.gz
SERVER-65265 Track memory for unbounded accumulators in SBE
Diffstat (limited to 'jstests/noPassthrough')
-rw-r--r--jstests/noPassthrough/agg_configurable_memory_limits.js104
-rw-r--r--jstests/noPassthrough/lookup_max_intermediate_size.js28
-rw-r--r--jstests/noPassthrough/spill_to_disk_secondary_read.js278
3 files changed, 257 insertions, 153 deletions
diff --git a/jstests/noPassthrough/agg_configurable_memory_limits.js b/jstests/noPassthrough/agg_configurable_memory_limits.js
index 5a66fb0f018..6ccb3e1434e 100644
--- a/jstests/noPassthrough/agg_configurable_memory_limits.js
+++ b/jstests/noPassthrough/agg_configurable_memory_limits.js
@@ -7,60 +7,80 @@ assert.neq(null, conn, "mongod was unable to start up");
const db = conn.getDB("test");
const coll = db.agg_configurable_memory_limit;
+// The approximate size of the strings below is 22-25 bytes, so configure one memory limit such that
+// 100 of these strings will surely exceed it but 24 of them won't, and another such that 24 will
+// exceed as well.
+const stringSize = 22;
+const nSetBaseline = 20;
+const memLimitArray = nSetBaseline * 5 * stringSize / 2;
+const memLimitSet = (nSetBaseline + 4) * stringSize / 2;
+
const bulk = coll.initializeUnorderedBulkOp();
-for (let i = 0; i < 100; i++) {
- bulk.insert({_id: i, x: i, y: ["string 1", "string 2", "string 3", "string 4", "string " + i]});
+for (let i = 0; i < nSetBaseline; i++) {
+ bulk.insert({_id: 5 * i + 0, y: "string 0"});
+ bulk.insert({_id: 5 * i + 1, y: "string 1"});
+ bulk.insert({_id: 5 * i + 2, y: "string 2"});
+ bulk.insert({_id: 5 * i + 3, y: "string 3"});
+ bulk.insert({_id: 5 * i + 4, y: "string " + 5 * i + 4});
}
assert.commandWorked(bulk.execute());
-// Test that pushing a bunch of strings to an array does not exceed the default 100MB memory limit.
-assert.doesNotThrow(
- () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$push: "$y"}}}]));
+(function testInternalQueryMaxPushBytesSetting() {
+ // Test that the default 100MB memory limit isn't reached with our data.
+ assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {$push: "$y"}}}]));
-// Now lower the limit to test that it's configuration is obeyed.
-assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryMaxPushBytes: 100}));
-assert.throwsWithCode(
- () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$push: "$y"}}}]),
- ErrorCodes.ExceededMemoryLimit);
+ // Now lower the limit to test that it's configuration is obeyed.
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalQueryMaxPushBytes: memLimitArray}));
+ assert.throwsWithCode(() => coll.aggregate([{$group: {_id: null, strings: {$push: "$y"}}}]),
+ ErrorCodes.ExceededMemoryLimit);
+}());
-// Test that using $addToSet behaves similarly.
-assert.doesNotThrow(
- () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$addToSet: "$y"}}}]));
+(function testInternalQueryMaxAddToSetBytesSetting() {
+ // Test that the default 100MB memory limit isn't reached with our data.
+ assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {$addToSet: "$y"}}}]));
-assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryMaxAddToSetBytes: 100}));
-assert.throwsWithCode(
- () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {$addToSet: "$y"}}}]),
- ErrorCodes.ExceededMemoryLimit);
+ // Test that $addToSet needs a tighter limit than $push (because some of the strings are the
+ // same).
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalQueryMaxAddToSetBytes: memLimitArray}));
+ assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {$addToSet: "$y"}}}]));
-// Capture the default value of 'internalQueryTopNAccumulatorBytes' to reset in between runs.
-const res =
- assert.commandWorked(db.adminCommand({getParameter: 1, internalQueryTopNAccumulatorBytes: 1}));
-const topNDefault = res["internalQueryTopNAccumulatorBytes"];
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalQueryMaxAddToSetBytes: memLimitSet}));
+ assert.throwsWithCode(() => coll.aggregate([{$group: {_id: null, strings: {$addToSet: "$y"}}}]),
+ ErrorCodes.ExceededMemoryLimit);
+}());
-// Test that the 'n' family of accumulators behaves similarly.
-for (const op of ["$firstN", "$lastN", "$minN", "$maxN", "$topN", "$bottomN"]) {
- let spec = {n: 200};
+(function testInternalQueryTopNAccumulatorBytesSetting() {
+ // Capture the default value of 'internalQueryTopNAccumulatorBytes' to reset in between runs.
+ const res = assert.commandWorked(
+ db.adminCommand({getParameter: 1, internalQueryTopNAccumulatorBytes: 1}));
+ const topNDefault = res["internalQueryTopNAccumulatorBytes"];
- // $topN/$bottomN both require a sort specification.
- if (op === "$topN" || op === "$bottomN") {
- spec["sortBy"] = {y: 1};
- spec["output"] = "$y";
- } else {
- // $firstN/$lastN/$minN/$maxN accept 'input'.
- spec["input"] = "$y";
- }
+ // Test that the 'n' family of accumulators behaves similarly.
+ for (const op of ["$firstN", "$lastN", "$minN", "$maxN", "$topN", "$bottomN"]) {
+ let spec = {n: 200};
- // First, verify that the accumulator doesn't throw.
- db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: topNDefault});
- assert.doesNotThrow(
- () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {[op]: spec}}}]));
+ // $topN/$bottomN both require a sort specification.
+ if (op === "$topN" || op === "$bottomN") {
+ spec["sortBy"] = {y: 1};
+ spec["output"] = "$y";
+ } else {
+ // $firstN/$lastN/$minN/$maxN accept 'input'.
+ spec["input"] = "$y";
+ }
- // Then, verify that the memory limit throws when lowered.
- db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: 100});
- assert.throwsWithCode(
- () => coll.aggregate([{$unwind: "$y"}, {$group: {_id: null, strings: {[op]: spec}}}]),
- ErrorCodes.ExceededMemoryLimit);
-}
+ // First, verify that the accumulator doesn't throw.
+ db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: topNDefault});
+ assert.doesNotThrow(() => coll.aggregate([{$group: {_id: null, strings: {[op]: spec}}}]));
+
+ // Then, verify that the memory limit throws when lowered.
+ db.adminCommand({setParameter: 1, internalQueryTopNAccumulatorBytes: 100});
+ assert.throwsWithCode(() => coll.aggregate([{$group: {_id: null, strings: {[op]: spec}}}]),
+ ErrorCodes.ExceededMemoryLimit);
+ }
+}());
MongoRunner.stopMongod(conn);
}());
diff --git a/jstests/noPassthrough/lookup_max_intermediate_size.js b/jstests/noPassthrough/lookup_max_intermediate_size.js
index d3a53039fff..b9534b29d38 100644
--- a/jstests/noPassthrough/lookup_max_intermediate_size.js
+++ b/jstests/noPassthrough/lookup_max_intermediate_size.js
@@ -27,7 +27,7 @@ function testPipeline(pipeline, expectedResult, collection) {
expectedResult.sort(compareId));
}
-function runTest(coll, from) {
+function runTest(coll, from, expectedErrorCode) {
const db = null; // Using the db variable is banned in this function.
from.drop();
@@ -81,29 +81,28 @@ function runTest(coll, from) {
{$project: {_id: 1}}
];
- assertErrorCode(coll, pipeline, 4568);
+ assertErrorCode(coll, pipeline, expectedErrorCode);
}
-// Run tests on single node.
+/**
+ * Run tests on single node.
+ */
const standalone = MongoRunner.runMongod();
const db = standalone.getDB("test");
-// TODO SERVER-65265 Remove 'if' block below.
-if (checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) {
- jsTest.log("Skipping test because SBE and SBE $lookup features are both enabled.");
- MongoRunner.stopMongod(standalone);
- return;
-}
-
assert.commandWorked(db.adminCommand(
{setParameter: 1, internalLookupStageIntermediateDocumentMaxSizeBytes: 30 * 1024 * 1024}));
db.lookUp.drop();
-runTest(db.lookUp, db.from);
+const expectedErrorCode =
+ (checkSBEEnabled(db, ["featureFlagSBELookupPushdown"])) ? ErrorCodes.ExceededMemoryLimit : 4568;
+runTest(db.lookUp, db.from, expectedErrorCode);
MongoRunner.stopMongod(standalone);
-// Run tests in a sharded environment.
+/**
+ * Run tests in a sharded environment.
+ */
const sharded = new ShardingTest({
mongos: 1,
shards: 2,
@@ -117,7 +116,10 @@ assert(sharded.adminCommand({enableSharding: "test"}));
sharded.getDB('test').lookUp.drop();
assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}}));
-runTest(sharded.getDB('test').lookUp, sharded.getDB('test').from);
+
+// If foreign collection is sharded, $lookup isn't lowered into SBE, so the memory limit error will
+// be coming from the classical aggregation pipeline.
+runTest(sharded.getDB('test').lookUp, sharded.getDB('test').from, 4568);
sharded.stop();
}());
diff --git a/jstests/noPassthrough/spill_to_disk_secondary_read.js b/jstests/noPassthrough/spill_to_disk_secondary_read.js
index 1a3c0702da5..f4c2ef20104 100644
--- a/jstests/noPassthrough/spill_to_disk_secondary_read.js
+++ b/jstests/noPassthrough/spill_to_disk_secondary_read.js
@@ -16,113 +16,195 @@ const replTest = new ReplSetTest({
replTest.startSet();
replTest.initiate();
-// Test that spilling '$group' and '$lookup' pipeline on a secondary works with a writeConcern
-// greater than w:1.
+/**
+ * Setup the primary and secondary collections.
+ */
let primary = replTest.getPrimary();
const insertColl = primary.getDB("test").foo;
-for (let i = 0; i < 500; ++i) {
- assert.commandWorked(insertColl.insert({a: i, string: "test test test"}));
+const cRecords = 50;
+for (let i = 0; i < cRecords; ++i) {
+ // We'll be using a unique 'key' field for group & lookup, but we cannot use '_id' for this,
+ // because '_id' is indexed and would trigger Indexed Loop Join instead of Hash Join.
+ assert.commandWorked(insertColl.insert({key: i, string: "test test test"}));
}
let secondary = replTest.getSecondary();
-assert.commandWorked(secondary.adminCommand(
- {setParameter: 1, internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: 1}));
-
-assert.commandWorked(secondary.adminCommand({
- setParameter: 1,
- internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1
-}));
-
-const isSBELookupEnabled =
- checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBELookupPushdown"]);
-
const readColl = secondary.getDB("test").foo;
-let pipeline = [{$group: {_id: '$a', s: {$addToSet: '$string'}, p: {$push: '$a'}}}];
-
-if (isSBELookupEnabled) {
- let explainRes = readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});
- const hashAggGroups = getSbePlanStages(explainRes, 'group');
- assert.eq(hashAggGroups.length, 1, explainRes);
- const hashAggGroup = hashAggGroups[0];
- assert(hashAggGroup, explainRes);
- assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup);
- assert(hashAggGroup.usedDisk, hashAggGroup);
- assert.eq(hashAggGroup.spilledRecords, 500, hashAggGroup);
- assert.eq(hashAggGroup.spilledBytesApprox, 27500, hashAggGroup);
-}
-
-let res =
- readColl
- .aggregate(
- pipeline,
- {allowDiskUse: true, readConcern: {level: "majority"}, writeConcern: {"w": "majority"}})
- .toArray();
-
-pipeline =
- [{$lookup: {from: readColl.getName(), localField: "a", foreignField: "a", as: "results"}}];
-
-if (isSBELookupEnabled) {
- let explainRes = readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});
- const hLookups = getSbePlanStages(explainRes, 'hash_lookup');
- assert.eq(hLookups.length, 1, explainRes);
- const hLookup = hLookups[0];
- assert(hLookup, explainRes);
- assert(hLookup.hasOwnProperty("usedDisk"), hLookup);
- assert(hLookup.usedDisk, hLookup);
- assert.eq(hLookup.spilledRecords, 1000, hLookup);
- assert.eq(hLookup.spilledBytesApprox, 63000, hLookup);
-}
+/**
+ * Test spilling of $group, when explicitly run on a secondary.
+ */
+(function testGroupSpilling() {
+ if (!checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBEGroupPushdown"])) {
+ jsTestLog("Skipping test for HashAgg stage: $group lowering into SBE isn't enabled");
+ return;
+ }
+
+ // Set memory limit so low that HashAgg has to spill all records it processes.
+ const oldSetting =
+ assert
+ .commandWorked(secondary.adminCommand({
+ setParameter: 1,
+ internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: 1
+ }))
+ .was;
+ try {
+ // The pipeline is silly -- because '$key' contains unique values, it will "group" exactly
+ // one record into each bucket and push a single '$string' value -- but it allows us to be
+ // more deterministic with spilling behaviour: each input record would create a record
+ // inside 'HashAgg' and because of the low memory limit, all of them will have to be
+ // spilled. For the spilled bytes, sanity test that the number is "reasonably large".
+ const pipeline = [{$group: {_id: '$key', s: {$push: '$string'}}}];
+ const expectedSpilledRecords = cRecords;
+ const expectedSpilledBytesAtLeast = cRecords * 10;
+
+ // Sanity check that the operation fails if cannot use disk and is successful otherwise.
+ const aggCommand = {
+ pipeline: pipeline,
+ allowDiskUse: false,
+ readConcern: {level: "majority"},
+ writeConcern: {"w": "majority"},
+ cursor: {}
+ };
+ assert.commandFailedWithCode(readColl.runCommand("aggregate", aggCommand),
+ ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed);
+ let aggOptions = {
+ allowDiskUse: true,
+ readConcern: {level: "majority"},
+ writeConcern: {"w": "majority"}
+ };
+ const res = readColl.aggregate(pipeline, aggOptions).toArray();
+ assert.eq(res.length, cRecords); // the group-by key is unique
+
+ // In SBE also check the statistics for disk usage. Note: 'explain()' doesn't support the
+ // 'writeConcern' option so we test spilling on the secondary but without using the concern.
+ const explainRes =
+ readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});
+ const hashAggGroups = getSbePlanStages(explainRes, 'group');
+ assert.eq(hashAggGroups.length, 1, explainRes);
+ const hashAggGroup = hashAggGroups[0];
+ assert(hashAggGroup, explainRes);
+ assert(hashAggGroup.hasOwnProperty("usedDisk"), hashAggGroup);
+ assert(hashAggGroup.usedDisk, hashAggGroup);
+ assert.eq(hashAggGroup.spilledRecords, expectedSpilledRecords, hashAggGroup);
+ assert.gte(hashAggGroup.spilledBytesApprox, expectedSpilledBytesAtLeast, hashAggGroup);
+ } finally {
+ assert.commandWorked(secondary.adminCommand({
+ setParameter: 1,
+ internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: oldSetting
+ }));
+ }
+})();
-res =
- readColl
- .aggregate(
- pipeline,
- {allowDiskUse: true, readConcern: {level: "majority"}, writeConcern: {"w": "majority"}})
- .toArray();
-
-insertColl.drop();
-
-// Test that spilling '$setWindowFields' pipeline on a secondary works with a writeConcern greater
-// than w:1.
-let avgDocSize = 274;
-let smallPartitionSize = 6;
-let largePartitionSize = 21;
-const insertCollWFs = primary.getDB("test").bar;
-
-// Create small partition.
-for (let i = 0; i < smallPartitionSize; i++) {
- assert.commandWorked(insertCollWFs.insert({_id: i, val: i, partition: 1}));
-}
-// Create large partition.
-for (let i = 0; i < largePartitionSize; i++) {
- assert.commandWorked(insertCollWFs.insert({_id: i + smallPartitionSize, val: i, partition: 2}));
-}
+/**
+ * Test spilling of $lookup when explicitly run on a secondary.
+ */
+(function testLookupSpillingInSbe() {
+ if (!checkSBEEnabled(secondary.getDB("test"), ["featureFlagSBELookupPushdown"])) {
+ jsTestLog("Skipping test for HashLookup stage: $lookup lowering into SBE isn't enabled");
+ return;
+ }
+
+ // Set memory limit so low that HashLookup has to spill all records it processes.
+ const oldSetting =
+ assert
+ .commandWorked(secondary.adminCommand({
+ setParameter: 1,
+ internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1
+ }))
+ .was;
+ try {
+ // The pipeline is silly -- because '$key' contains unique values, it will self-join each
+ // record with itself and nothing else -- but it allows us to be more deterministic with
+ // the spilling behaviour. For the spilled bytes, sanity test that the number is "reasonably
+ // large".
+ const pipeline = [{
+ $lookup:
+ {from: readColl.getName(), localField: "key", foreignField: "key", as: "results"}
+ }];
+ const expectedSpilledRecordsAtLeast = cRecords;
+ const expectedSpilledBytesAtLeast = cRecords * 20;
+
+ // Sanity check that the operation is successful. Note: we cannot test the operation to fail
+ // with 'allowDiskUse' set to "false" because that would block HashJoin and fall back to NLJ
+ // which doesn't spill.
+ let aggOptions = {
+ allowDiskUse: true,
+ readConcern: {level: "majority"},
+ writeConcern: {"w": "majority"}
+ };
+ const res = readColl.aggregate(pipeline, aggOptions).toArray();
+ assert.eq(res.length, cRecords); // the key for self-join is unique
+
+ // In SBE also check the statistics for disk usage. Note: 'explain()' doesn't support the
+ // 'writeConcern' option so we test spilling on the secondary but without using the concern.
+ const explainRes =
+ readColl.explain('executionStats').aggregate(pipeline, {allowDiskUse: true});
+
+ const hLookups = getSbePlanStages(explainRes, 'hash_lookup');
+ assert.eq(hLookups.length, 1, explainRes);
+ const hLookup = hLookups[0];
+ assert(hLookup, explainRes);
+ assert(hLookup.hasOwnProperty("usedDisk"), hLookup);
+ assert(hLookup.usedDisk, hLookup);
+ assert.gte(hLookup.spilledRecords, expectedSpilledRecordsAtLeast, hLookup);
+ assert.gte(hLookup.spilledBytesApprox, expectedSpilledBytesAtLeast, hLookup);
+ } finally {
+ assert.commandWorked(secondary.adminCommand({
+ setParameter: 1,
+ internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: oldSetting
+ }));
+ }
+})();
-assert.commandWorked(secondary.adminCommand({
- setParameter: 1,
- internalDocumentSourceSetWindowFieldsMaxMemoryBytes: largePartitionSize * avgDocSize + 1
-}));
-
-const readCollWFs = secondary.getDB("test").bar;
-
-pipeline = [
- {
- $setWindowFields: {
- partitionBy: "$partition",
- sortBy: {partition: 1},
- output: {arr: {$push: "$val", window: {documents: [-25, 25]}}}
- }
- },
- {$sort: {_id: 1}}
-];
-
-res =
- readCollWFs
- .aggregate(
- pipeline,
- {allowDiskUse: true, readConcern: {level: "majority"}, writeConcern: {"w": "majority"}})
- .toArray();
+/**
+ * Test spilling of $setWindowFields. We only check that the operation is successful. Main tests for
+ * $setWindowFields can be found in jstests/aggregation/sources/setWindowFields/spill_to_disk.js.
+ */
+(function testSetWindowFields() {
+ // Test that spilling '$setWindowFields' pipeline on a secondary works with a writeConcern
+ // greater than w:1.
+ let avgDocSize = 274;
+ let smallPartitionSize = 6;
+ let largePartitionSize = 21;
+ const insertCollWFs = primary.getDB("test").bar;
+
+ // Create small partition.
+ for (let i = 0; i < smallPartitionSize; i++) {
+ assert.commandWorked(insertCollWFs.insert({_id: i, val: i, partition: 1}));
+ }
+ // Create large partition.
+ for (let i = 0; i < largePartitionSize; i++) {
+ assert.commandWorked(
+ insertCollWFs.insert({_id: i + smallPartitionSize, val: i, partition: 2}));
+ }
+
+ assert.commandWorked(secondary.adminCommand({
+ setParameter: 1,
+ internalDocumentSourceSetWindowFieldsMaxMemoryBytes: largePartitionSize * avgDocSize + 1
+ }));
+
+ const readCollWFs = secondary.getDB("test").bar;
+
+ let pipeline = [
+ {
+ $setWindowFields: {
+ partitionBy: "$partition",
+ sortBy: {partition: 1},
+ output: {arr: {$push: "$val", window: {documents: [-25, 25]}}}
+ }
+ },
+ {$sort: {_id: 1}}
+ ];
+
+ let res = readCollWFs
+ .aggregate(pipeline, {
+ allowDiskUse: true,
+ readConcern: {level: "majority"},
+ writeConcern: {"w": "majority"}
+ })
+ .toArray();
+})();
replTest.stopSet();
})();