summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgalon1 <gil.alon@mongodb.com>2022-09-13 17:30:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-13 20:04:00 +0000
commit5135c8e1b640ac0e8ac5cda714ff17e084a107ec (patch)
treef087cde0be15802a8d1500f6a29c6ebe520a34a7
parent293d41288fa7ccfbc03b8448b44214b2cf84db76 (diff)
downloadmongo-5135c8e1b640ac0e8ac5cda714ff17e084a107ec.tar.gz
SERVER-62071 Add useDisk call to preserve intermediate pipeline values
-rw-r--r--jstests/aggregation/sources/setWindowFields/spill_to_disk.js37
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp4
2 files changed, 34 insertions, 7 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/spill_to_disk.js b/jstests/aggregation/sources/setWindowFields/spill_to_disk.js
index b006c1e66d2..40eeb984f45 100644
--- a/jstests/aggregation/sources/setWindowFields/spill_to_disk.js
+++ b/jstests/aggregation/sources/setWindowFields/spill_to_disk.js
@@ -36,15 +36,15 @@ seedWithTickerData(coll, 10);
// Run $sum test with memory limits that cause spilling to disk.
testAccumAgainstGroup(coll, "$sum", 0);
-function checkProfilerForDiskWrite(dbToCheck) {
+function checkProfilerForDiskWrite(dbToCheck, expectedFirstStage) {
if (!FixtureHelpers.isMongos(dbToCheck)) {
const profileObj = getLatestProfilerEntry(dbToCheck, {usedDisk: true});
jsTestLog(profileObj);
// Verify that this was a $setWindowFields stage as expected.
if (profileObj.hasOwnProperty("originatingCommand")) {
- assert(profileObj.originatingCommand.pipeline[0].hasOwnProperty("$setWindowFields"));
+ assert(profileObj.originatingCommand.pipeline[0].hasOwnProperty(expectedFirstStage));
} else if (profileObj.hasOwnProperty("command")) {
- assert(profileObj.command.pipeline[0].hasOwnProperty("$setWindowFields"));
+ assert(profileObj.command.pipeline[0].hasOwnProperty(expectedFirstStage));
} else {
assert(false, "Profiler should have had command field", profileObj);
}
@@ -72,7 +72,7 @@ const wfResults =
{allowDiskUse: true, cursor: {batchSize: 1}})
.toArray();
assert.eq(wfResults.length, 20);
-checkProfilerForDiskWrite(db);
+checkProfilerForDiskWrite(db, "$setWindowFields");
// Test a small, in memory, partition and a larger partition that requires spilling to disk.
coll.drop();
@@ -113,7 +113,7 @@ for (let i = 0; i < results.length; i++) {
assert.eq(results[i].sum, 210, "Unexepcted result in second partition at position " + i);
}
}
-checkProfilerForDiskWrite(db);
+checkProfilerForDiskWrite(db, "$setWindowFields");
// We don't execute setWindowFields in a sharded explain.
if (!FixtureHelpers.isMongos(db)) {
@@ -172,7 +172,7 @@ results = coll.aggregate(
],
{allowDiskUse: true})
.toArray();
-checkProfilerForDiskWrite(db);
+checkProfilerForDiskWrite(db, "$setWindowFields");
for (let i = 0; i < results.length; i++) {
if (results[i].partition === 1) {
assert(arrayEq(results[i].arr, [0, 1, 2, 3, 4, 5]),
@@ -234,12 +234,35 @@ results =
],
{allowDiskUse: true})
.toArray();
-checkProfilerForDiskWrite(db);
+checkProfilerForDiskWrite(db, "$setWindowFields");
// Check that the command succeeded.
assert.eq(results.length, numDocs);
for (let i = 0; i < numDocs; i++) {
assert.eq(results[i].arr, 616605, results);
}
+
+// Test that usedDisk true is set when spilling occurs inside $lookup subpipline.
+// Lower the memory limit to ensure spilling occurs.
+setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
+ "internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
+ 500);
+resetProfiler(db);
+coll.aggregate(
+ [
+ {$lookup: {
+ from: coll.getName(),
+ as: "same",
+ pipeline: [{
+ $setWindowFields: {
+ sortBy: {_id: 1},
+ output: {res: {$sum: "$price", window: {documents: ["unbounded", 5]}}}
+ },
+ }],
+ }}],
+ {allowDiskUse: true, cursor: {}})
+ .toArray();
+checkProfilerForDiskWrite(db, "$lookup");
+
// Reset limit for other tests.
setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()),
"internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index e222d0ea7ce..11fe6ebdf4a 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -474,6 +474,10 @@ DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
}
accumulatePipelinePlanSummaryStats(*pipeline, _stats.planSummaryStats);
+
+ // Check if pipeline uses disk.
+ _stats.planSummaryStats.usedDisk = _stats.planSummaryStats.usedDisk || pipeline->usedDisk();
+
MutableDocument output(std::move(inputDoc));
output.setNestedField(_as, Value(std::move(results)));
return output.freeze();