diff options
-rw-r--r-- | jstests/aggregation/sources/setWindowFields/spill_to_disk.js | 37 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 4 |
2 files changed, 34 insertions, 7 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/spill_to_disk.js b/jstests/aggregation/sources/setWindowFields/spill_to_disk.js index b006c1e66d2..40eeb984f45 100644 --- a/jstests/aggregation/sources/setWindowFields/spill_to_disk.js +++ b/jstests/aggregation/sources/setWindowFields/spill_to_disk.js @@ -36,15 +36,15 @@ seedWithTickerData(coll, 10); // Run $sum test with memory limits that cause spilling to disk. testAccumAgainstGroup(coll, "$sum", 0); -function checkProfilerForDiskWrite(dbToCheck) { +function checkProfilerForDiskWrite(dbToCheck, expectedFirstStage) { if (!FixtureHelpers.isMongos(dbToCheck)) { const profileObj = getLatestProfilerEntry(dbToCheck, {usedDisk: true}); jsTestLog(profileObj); // Verify that this was a $setWindowFields stage as expected. if (profileObj.hasOwnProperty("originatingCommand")) { - assert(profileObj.originatingCommand.pipeline[0].hasOwnProperty("$setWindowFields")); + assert(profileObj.originatingCommand.pipeline[0].hasOwnProperty(expectedFirstStage)); } else if (profileObj.hasOwnProperty("command")) { - assert(profileObj.command.pipeline[0].hasOwnProperty("$setWindowFields")); + assert(profileObj.command.pipeline[0].hasOwnProperty(expectedFirstStage)); } else { assert(false, "Profiler should have had command field", profileObj); } @@ -72,7 +72,7 @@ const wfResults = {allowDiskUse: true, cursor: {batchSize: 1}}) .toArray(); assert.eq(wfResults.length, 20); -checkProfilerForDiskWrite(db); +checkProfilerForDiskWrite(db, "$setWindowFields"); // Test a small, in memory, partition and a larger partition that requires spilling to disk. coll.drop(); @@ -113,7 +113,7 @@ for (let i = 0; i < results.length; i++) { assert.eq(results[i].sum, 210, "Unexepcted result in second partition at position " + i); } } -checkProfilerForDiskWrite(db); +checkProfilerForDiskWrite(db, "$setWindowFields"); // We don't execute setWindowFields in a sharded explain. if (!FixtureHelpers.isMongos(db)) { @@ -172,7 +172,7 @@ results = coll.aggregate( ], {allowDiskUse: true}) .toArray(); -checkProfilerForDiskWrite(db); +checkProfilerForDiskWrite(db, "$setWindowFields"); for (let i = 0; i < results.length; i++) { if (results[i].partition === 1) { assert(arrayEq(results[i].arr, [0, 1, 2, 3, 4, 5]), @@ -234,12 +234,35 @@ results = ], {allowDiskUse: true}) .toArray(); -checkProfilerForDiskWrite(db); +checkProfilerForDiskWrite(db, "$setWindowFields"); // Check that the command succeeded. assert.eq(results.length, numDocs); for (let i = 0; i < numDocs; i++) { assert.eq(results[i].arr, 616605, results); } + +// Test that usedDisk true is set when spilling occurs inside $lookup subpipline. +// Lower the memory limit to ensure spilling occurs. +setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()), + "internalDocumentSourceSetWindowFieldsMaxMemoryBytes", + 500); +resetProfiler(db); +coll.aggregate( + [ + {$lookup: { + from: coll.getName(), + as: "same", + pipeline: [{ + $setWindowFields: { + sortBy: {_id: 1}, + output: {res: {$sum: "$price", window: {documents: ["unbounded", 5]}}} + }, + }], + }}], + {allowDiskUse: true, cursor: {}}) + .toArray(); +checkProfilerForDiskWrite(db, "$lookup"); + // Reset limit for other tests. setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(db.getMongo()), "internalDocumentSourceSetWindowFieldsMaxMemoryBytes", diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index e222d0ea7ce..11fe6ebdf4a 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -474,6 +474,10 @@ DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { } accumulatePipelinePlanSummaryStats(*pipeline, _stats.planSummaryStats); + + // Check if pipeline uses disk. + _stats.planSummaryStats.usedDisk = _stats.planSummaryStats.usedDisk || pipeline->usedDisk(); + MutableDocument output(std::move(inputDoc)); output.setNestedField(_as, Value(std::move(results))); return output.freeze(); |