diff options
54 files changed, 663 insertions, 286 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index fd49ef4c7e5..b8763a45545 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -1,19 +1,10 @@ /** * Tests that split aggregations whose merge pipelines are eligible to run on mongoS do so, and - * produce the expected results. + * produce the expected results. Stages which are eligible to merge on mongoS include: * - * Splittable stages whose merge components are eligible to run on mongoS include: - * - $sort (iff merging pre-sorted streams) - * - $skip - * - $limit - * - $sample - * - * Non-splittable stages such as those listed below are eligible to run in a mongoS merge pipeline: - * - $match - * - $project - * - $addFields - * - $unwind - * - $redact + * - Splitpoints whose merge components are non-blocking, e.g. $skip, $limit, $sort, $sample. + * - Non-splittable streaming stages, e.g. $match, $project, $unwind. + * - Blocking stages in cases where 'allowDiskUse' is false, e.g. $group, $bucketAuto. * * Because wrapping these aggregations in a $facet stage will affect how the pipeline can be merged, * and will therefore invalidate the results of the test cases below, we tag this test to prevent it @@ -23,12 +14,14 @@ */ (function() { - load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions. + load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions. + load('jstests/libs/geo_near_random.js'); // For GeoNearRandomTest. const st = new ShardingTest({shards: 2, mongos: 1, config: 1}); const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; + const unshardedColl = mongosDB[jsTestName() + "_unsharded"]; const shard0DB = primaryShardDB = st.shard0.getDB(jsTestName()); const shard1DB = st.shard1.getDB(jsTestName()); @@ -52,6 +45,9 @@ assert.commandWorked( mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + // We will need to test $geoNear on this collection, so create a 2dsphere index. + assert.commandWorked(mongosColl.createIndex({geo: "2dsphere"})); + // Split the collection into 4 chunks: [MinKey, -100), [-100, 0), [0, 100), [100, MaxKey). assert.commandWorked( mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: -100}})); @@ -66,29 +62,47 @@ assert.commandWorked(mongosDB.adminCommand( {moveChunk: mongosColl.getFullName(), find: {_id: 150}, to: "shard0001"})); + // Create a random geo co-ord generator for testing. + var georng = new GeoNearRandomTest(mongosColl); + // Write 400 documents across the 4 chunks. for (let i = -200; i < 200; i++) { - assert.writeOK(mongosColl.insert({_id: i, a: [i], b: {redactThisDoc: true}, c: true})); + assert.writeOK(mongosColl.insert( + {_id: i, a: [i], b: {redactThisDoc: true}, c: true, geo: georng.mkPt()})); + assert.writeOK(unshardedColl.insert({_id: i, x: i})); } + let testNameHistory = new Set(); + /** * Runs the aggregation specified by 'pipeline', verifying that: * - The number of documents returned by the aggregation matches 'expectedCount'. * - The merge was performed on a mongoS if 'mergeType' is 'mongos', and on a shard otherwise. */ - function assertMergeBehaviour({testName, pipeline, mergeType, batchSize, expectedCount}) { - // Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our - // expectation. + function assertMergeBehaviour( + {testName, pipeline, mergeType, batchSize, allowDiskUse, expectedCount}) { + // Ensure that this test has a unique name. + assert(!testNameHistory.has(testName)); + testNameHistory.add(testName); + + // Create the aggregation options from the given arguments. + const opts = { + comment: testName, + cursor: (batchSize ? {batchSize: batchSize} : {}), + }; + + if (allowDiskUse !== undefined) { + opts.allowDiskUse = allowDiskUse; + } + + // Verify that the explain() output's 'mergeType' field matches our expectation. assert.eq( - assert.commandWorked(mongosColl.explain().aggregate(pipeline, {comment: testName})) + assert.commandWorked(mongosColl.explain().aggregate(pipeline, Object.extend({}, opts))) .mergeType, mergeType); - assert.eq( - mongosColl - .aggregate(pipeline, {comment: testName, cursor: {batchSize: (batchSize || 101)}}) - .itcount(), - expectedCount); + // Verify that the aggregation returns the expected number of results. + assert.eq(mongosColl.aggregate(pipeline, opts).itcount(), expectedCount); // Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeType' is not // 'mongos', and that no such aggregation ran otherwise. @@ -107,12 +121,13 @@ * Throws an assertion if the aggregation specified by 'pipeline' does not produce * 'expectedCount' results, or if the merge phase is not performed on the mongoS. */ - function assertMergeOnMongoS({testName, pipeline, batchSize, expectedCount}) { + function assertMergeOnMongoS({testName, pipeline, batchSize, allowDiskUse, expectedCount}) { assertMergeBehaviour({ testName: testName, pipeline: pipeline, mergeType: "mongos", - batchSize: (batchSize || 101), + batchSize: (batchSize || 10), + allowDiskUse: allowDiskUse, expectedCount: expectedCount }); } @@ -121,94 +136,214 @@ * Throws an assertion if the aggregation specified by 'pipeline' does not produce * 'expectedCount' results, or if the merge phase was not performed on a shard. */ - function assertMergeOnMongoD({testName, pipeline, mergeType, batchSize, expectedCount}) { + function assertMergeOnMongoD( + {testName, pipeline, mergeType, batchSize, allowDiskUse, expectedCount}) { assertMergeBehaviour({ testName: testName, pipeline: pipeline, mergeType: (mergeType || "anyShard"), - batchSize: (batchSize || 101), + batchSize: (batchSize || 10), + allowDiskUse: allowDiskUse, expectedCount: expectedCount }); } - // - // Test cases. - // + /** + * Runs a series of test cases which will consistently merge on mongoS or mongoD regardless of + * whether 'allowDiskUse' is true, false or omitted. + */ + function runTestCasesWhoseMergeLocationIsConsistentRegardlessOfAllowDiskUse(allowDiskUse) { + // Test that a $match pipeline with an empty merge stage is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_match_only", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}], + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); - let testName; + // Test that a $sort stage which merges pre-sorted streams is run on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_sort_presorted", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}], + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); - // Test that a $match pipeline with an empty merge stage is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_match_only", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}], - batchSize: 10, - expectedCount: 400 - }); + // Test that $skip is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_skip", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}], + allowDiskUse: allowDiskUse, + expectedCount: 100 + }); - // Test that a $sort stage which merges pre-sorted streams is run on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_sort_presorted", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}], - batchSize: 10, - expectedCount: 400 - }); + // Test that $limit is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_limit", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}], + allowDiskUse: allowDiskUse, + expectedCount: 300 + }); - // Test that a $sort stage which must sort the dataset from scratch is NOT run on mongoS. - assertMergeOnMongoD({ - testName: "agg_mongos_merge_sort_in_mem", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}], - batchSize: 10, - expectedCount: 400 - }); + // Test that $sample is merged on mongoS if it is the splitpoint, since this will result in + // a merging $sort of presorted streams in the merge pipeline. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_sample_splitpoint", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}], + allowDiskUse: allowDiskUse, + expectedCount: 300 + }); - // Test that a merge pipeline which needs to run on the primary shard is NOT merged on mongoS. - assertMergeOnMongoD({ - testName: "agg_mongos_merge_primary_shard", - pipeline: [ - {$match: {_id: {$gte: -200, $lte: 200}}}, - {$_internalSplitPipeline: {mergeType: "primaryShard"}} - ], - mergeType: "primaryShard", - batchSize: 10, - expectedCount: 400 - }); + // Test that $geoNear is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_geo_near", + pipeline: [ + {$geoNear: {near: [0, 0], distanceField: "distance", spherical: true, limit: 300}} + ], + allowDiskUse: allowDiskUse, + expectedCount: 300 + }); - // Test that $skip is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_skip", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}], - batchSize: 10, - expectedCount: 100 - }); + // Test that a pipeline whose merging half can be run on mongos using only the mongos + // execution machinery returns the correct results. + // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$sort: {_id: -1}}, + {$skip: 150}, + {$limit: 150}, + {$skip: 5}, + {$limit: 1}, + ], + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); - // Test that $limit is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_limit", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}], - batchSize: 10, - expectedCount: 300 - }); + // Test that a merge pipeline which needs to run on a shard is NOT merged on mongoS + // regardless of 'allowDiskUse'. + assertMergeOnMongoD({ + testName: "agg_mongos_merge_primary_shard_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$_internalSplitPipeline: {mergeType: "anyShard"}} + ], + mergeType: "anyShard", + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); + } - // Test that $sample is merged on mongoS. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_sample", - pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}], - batchSize: 10, - expectedCount: 300 - }); + /** + * Runs a series of test cases which will always merge on mongoD when 'allowDiskUse' is true, + * and on mongoS when 'allowDiskUse' is false or omitted. + */ + function runTestCasesWhoseMergeLocationDependsOnAllowDiskUse(allowDiskUse) { + // All test cases should merge on mongoD if allowDiskUse is true, mongoS otherwise. + const assertMergeOnMongoX = (allowDiskUse ? assertMergeOnMongoD : assertMergeOnMongoS); + + // Test that a blocking $sort is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_blocking_sort_no_disk_use", + pipeline: + [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}], + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); + + // Test that $group is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_group_allow_disk_use", + pipeline: + [{$match: {_id: {$gte: -200, $lte: 200}}}, {$group: {_id: {$mod: ["$_id", 150]}}}], + allowDiskUse: allowDiskUse, + expectedCount: 299 + }); + + // Test that a blocking $sample is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_blocking_sample_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$sample: {size: 300}}, + {$sample: {size: 200}} + ], + allowDiskUse: allowDiskUse, + expectedCount: 200 + }); + + // Test that $bucketAuto is only merged on mongoS if 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_bucket_auto_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + {$bucketAuto: {groupBy: "$_id", buckets: 10}} + ], + allowDiskUse: allowDiskUse, + expectedCount: 10 + }); + + // + // Test composite stages. + // + + // Test that $bucket ($group->$sort) is merged on mongoS iff 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_bucket_allow_disk_use", + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $bucket: { + groupBy: "$_id", + boundaries: [-200, -150, -100, -50, 0, 50, 100, 150, 200] + } + } + ], + allowDiskUse: allowDiskUse, + expectedCount: 8 + }); + + // Test that $sortByCount ($group->$sort) is merged on mongoS iff 'allowDiskUse' isn't set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_sort_by_count_allow_disk_use", + pipeline: + [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sortByCount: {$mod: ["$_id", 150]}}], + allowDiskUse: allowDiskUse, + expectedCount: 299 + }); + + // Test that $count ($group->$project) is merged on mongoS iff 'allowDiskUse' is not set. + assertMergeOnMongoX({ + testName: "agg_mongos_merge_count_allow_disk_use", + pipeline: [{$match: {_id: {$gte: -150, $lte: 1500}}}, {$count: "doc_count"}], + allowDiskUse: allowDiskUse, + expectedCount: 1 + }); + } + + // Run all test cases for each potential value of 'allowDiskUse'. + for (let allowDiskUse of[false, undefined, true]) { + runTestCasesWhoseMergeLocationIsConsistentRegardlessOfAllowDiskUse(allowDiskUse); + runTestCasesWhoseMergeLocationDependsOnAllowDiskUse(allowDiskUse); + testNameHistory.clear(); + } // Test that merge pipelines containing all mongos-runnable stages produce the expected output. assertMergeOnMongoS({ testName: "agg_mongos_merge_all_mongos_runnable_stages", pipeline: [ - {$match: {_id: {$gte: -200, $lte: 200}}}, - {$sort: {_id: 1}}, + {$geoNear: {near: [0, 0], distanceField: "distance", spherical: true, limit: 400}}, + {$sort: {a: 1}}, {$skip: 150}, {$limit: 150}, {$addFields: {d: true}}, {$unwind: "$a"}, {$sample: {size: 100}}, - {$project: {c: 0}}, + {$project: {c: 0, geo: 0, distance: 0}}, + {$group: {_id: "$_id", doc: {$push: "$$CURRENT"}}}, + {$unwind: "$doc"}, + {$replaceRoot: {newRoot: "$doc"}}, { $redact: { $cond: @@ -218,33 +353,17 @@ { $match: { _id: {$gte: -50, $lte: 100}, - a: {$gte: -50, $lte: 100}, + a: {$type: "number", $gte: -50, $lte: 100}, b: {$exists: false}, c: {$exists: false}, - d: true + d: true, + geo: {$exists: false}, + distance: {$exists: false} } } ], - batchSize: 10, expectedCount: 100 }); - // Test that a pipeline whose merging half can be run on mongos using only the mongos execution - // machinery returns the correct results. - assertMergeOnMongoS({ - testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages", - pipeline: [ - {$match: {_id: {$gte: -5, $lte: 100}}}, - {$sort: {_id: -1}}, - {$skip: 5}, - {$limit: 10}, - {$skip: 5}, - {$limit: 1}, - ], - batchSize: 10, - expectedCount: 1 - }); - // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos. - st.stop(); })(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index eba4e2acc4b..934d9d29da2 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -132,20 +132,70 @@ public: * A HostTypeRequirement defines where this stage is permitted to be executed when the * pipeline is run on a sharded cluster. */ - enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS }; + enum class HostTypeRequirement { kNone, kPrimaryShard, kAnyShard }; - // Set if this stage needs to be in a particular position of the pipeline. - PositionRequirement requiredPosition = PositionRequirement::kNone; + /** + * A DiskUseRequirement indicates whether this stage writes to disk, or whether it may spill + * to disk if its memory usage exceeds a given threshold. Note that this only indicates the + * *ability* of the stage to spill; if 'allowDiskUse' is set to false, it will be prevented + * from doing so. + */ + enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData }; + + /** + * A FacetRequirement indicates whether this stage may be used within a $facet pipeline. + */ + enum class FacetRequirement { kAllowed, kNotAllowed }; - // Set if this stage can only be executed on specific components of a sharded cluster. - HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard; + /** + * A StreamType defines whether this stage is streaming (can produce output based solely on + * the current input document) or blocking (must examine subsequent documents before + * producing an output document). + */ + enum class StreamType { kStreaming, kBlocking }; + + StageConstraints(StreamType streamType, + PositionRequirement requiredPosition, + HostTypeRequirement hostRequirement, + DiskUseRequirement diskRequirement, + FacetRequirement facetRequirement) + : requiredPosition(requiredPosition), + hostRequirement(hostRequirement), + diskRequirement(diskRequirement), + facetRequirement(facetRequirement), + streamType(streamType) { + // Stages which are allowed to run in $facet pipelines must not have any specific + // position requirements. + invariant(!isAllowedInsideFacetStage() || + requiredPosition == PositionRequirement::kNone); + } - bool isAllowedInsideFacetStage = true; + // Indicates whether this stage needs to be at a particular position in the pipeline. + const PositionRequirement requiredPosition; + + // Indicates whether this stage can only be executed on specific components of a sharded + // cluster. + const HostTypeRequirement hostRequirement; + + // Indicates whether this stage may write persistent data to disk, or may spill to temporary + // files if its memory usage becomes excessive. + const DiskUseRequirement diskRequirement; + + // Indicates whether this stage may run inside a $facet stage. + const FacetRequirement facetRequirement; + + // Indicates whether this is a streaming or blocking stage. + const StreamType streamType; // True if this stage does not generate results itself, and instead pulls inputs from an // input DocumentSource (via 'pSource'). bool requiresInputDocSource = true; + // True if this stage should be permitted to run in a $facet pipeline. + bool isAllowedInsideFacetStage() const { + return facetRequirement == FacetRequirement::kAllowed; + } + // True if this stage operates on a global or database level, like $currentOp. bool isIndependentOfAnyCollection = false; @@ -165,6 +215,9 @@ public: using HostTypeRequirement = StageConstraints::HostTypeRequirement; using PositionRequirement = StageConstraints::PositionRequirement; + using DiskUseRequirement = StageConstraints::DiskUseRequirement; + using FacetRequirement = StageConstraints::FacetRequirement; + using StreamType = StageConstraints::StreamType; /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a @@ -260,9 +313,7 @@ public: * Returns a struct containing information about any special constraints imposed on using this * stage. */ - virtual StageConstraints constraints() const { - return StageConstraints{}; - } + virtual StageConstraints constraints() const = 0; /** * Informs the stage that it is no longer needed and can release its resources. After dispose() diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp index e874fd5d426..a8123f5f2fc 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp @@ -91,7 +91,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() { if (!_sorter) { SortOptions opts; opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) { + if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; } diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h index 2a40b092c1f..60c5ed02501 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto.h +++ b/src/mongo/db/pipeline/document_source_bucket_auto.h @@ -48,6 +48,14 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; + StageConstraints constraints() const final { + return {StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed}; + } + /** * The $bucketAuto stage must be run on the merging shard. */ diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp index 08926254480..df19b4eb497 100644 --- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp @@ -350,7 +350,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToCorrectlySpillToDisk) { auto expCtx = getExpCtx(); unittest::TempDir tempDir("DocumentSourceBucketAutoTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; @@ -386,7 +386,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) { // Allow the $sort stage to spill to disk. unittest::TempDir tempDir("DocumentSourceBucketAutoTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; @@ -647,22 +647,22 @@ void assertCannotSpillToDisk(const boost::intrusive_ptr<ExpressionContext>& expC TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; expCtx->inMongos = false; assertCannotSpillToDisk(expCtx); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; expCtx->inMongos = true; assertCannotSpillToDisk(expCtx); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; expCtx->inMongos = true; assertCannotSpillToDisk(expCtx); } TEST_F(BucketAutoTests, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a38a1aef2ce..40ceaa5b1b7 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -97,10 +97,11 @@ const char* DocumentSourceOplogMatch::getSourceName() const { } DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints() const { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.isAllowedInsideFacetStage = false; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; } /** @@ -142,6 +143,14 @@ public: return "$changeStream"; } + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { // This stage is created by the DocumentSourceChangeStream stage, so serializing it // here would result in it being created twice. diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h index 22441f2a03a..cfbef21088d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.h +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -60,6 +60,14 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create( @@ -87,6 +95,14 @@ public: GetNextResult getNext() final; const char* getSourceName() const final; + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + /** * SplittableDocumentSource methods; this has to run on the merger, since the resume point could * be at any shard. diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 2903241d1a7..0c319729328 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -75,10 +75,13 @@ public: const char* getSourceName() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index d5138e74d2a..6fa869712b1 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -79,11 +79,14 @@ public: const char* getSourceName() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.isIndependentOfAnyCollection = true; + constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index f9c9345baf6..674bbaa85ee 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -52,8 +52,12 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index c7f82678eb1..6c58124387b 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -238,21 +238,29 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx) } DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets. - - for (auto&& facet : _facets) { - for (auto&& nestedStage : facet.pipeline->getSources()) { - if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) { - // Currently we don't split $facet to have a merger part and a shards part (see - // SERVER-24154). This means that if any stage in any of the $facet pipelines - // requires the primary shard, then the entire $facet must happen on the merger, and - // the merger must be the primary shard. - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; - } - } - } - return constraints; + const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) { + const auto sources = facet.pipeline->getSources(); + return std::any_of(sources.begin(), sources.end(), [&](const auto source) { + return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData; + }); + }); + + // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154). + // This means that if any stage in any of the $facet pipelines requires the primary shard, then + // the entire $facet must happen on the merger, and the merger must be the primary shard. + const bool needsPrimaryShard = + std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) { + const auto sources = facet.pipeline->getSources(); + return std::any_of(sources.begin(), sources.end(), [&](const auto source) { + return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard; + }); + }); + + return {StreamType::kBlocking, + PositionRequirement::kNone, + needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard, + mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; } DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index be05268fb7b..61d112906c7 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -186,9 +186,11 @@ public: DocumentSourcePassthrough() : DocumentSourceMock({}) {} StageConstraints constraints() const override { - StageConstraints constraints; - constraints.isAllowedInsideFacetStage = true; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } DocumentSource::GetNextResult getNext() final { @@ -625,9 +627,11 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() { diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp index c971509e373..f7b7a5ca741 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.cpp +++ b/src/mongo/db/pipeline/document_source_geo_near.cpp @@ -61,7 +61,7 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() { if (!resultsIterator->more()) return GetNextResult::makeEOF(); - // each result from the geoNear command is wrapped in a wrapper object with "obj", + // Each result from the geoNear command is wrapped in a wrapper object with "obj", // "dis" and maybe "loc" fields. We want to take the object from "obj" and inject the // other fields into it. Document result(resultsIterator->next().embeddedObject()); @@ -70,6 +70,11 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() { if (includeLocs) output.setNestedField(*includeLocs, result["loc"]); + // In a cluster, $geoNear output will be merged via $sort, so add the sort key. + if (pExpCtx->needsMerge) { + output.setSortKeyMetaField(BSON("" << result["dis"])); + } + return output.freeze(); } @@ -89,12 +94,13 @@ Pipeline::SourceContainer::iterator DocumentSourceGeoNear::doOptimizeAt( } // This command is sent as-is to the shards. -// On router this becomes a sort by distance (nearest-first) with limit. intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getShardSource() { return this; } +// On mongoS this becomes a merge sort by distance (nearest-first) with limit. intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getMergeSource() { - return DocumentSourceSort::create(pExpCtx, BSON(distanceField->fullPath() << 1), limit); + return DocumentSourceSort::create( + pExpCtx, BSON(distanceField->fullPath() << 1 << "$mergePresorted" << true), limit); } Value DocumentSourceGeoNear::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 315dcea6dd4..23e7c4e901c 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -48,10 +48,13 @@ public: Pipeline::SourceContainer* container) final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 53394d3d0ab..afd633d46bc 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -54,9 +54,13 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + constraints.canSwapWithMatch = true; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index ba2c48680f2..41b2546e331 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -266,7 +266,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>& _initialized(false), _groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()), _spilled(false), - _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inMongos) {} + _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {} void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) { _accumulatedFields.push_back(accumulationStatement); @@ -485,7 +485,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { uassert(16945, "Exceeded memory limit for $group, but didn't allow external sort." " Pass allowDiskUse:true to opt in.", - _extSortAllowed); + _allowDiskUse); _sortedFiles.push_back(spill()); _memoryUsageBytes = 0; } @@ -531,7 +531,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() { // In debug mode, spill every time we have a duplicate id to stress merge logic. if (!inserted && // is a dup !pExpCtx->inMongos && // can't spill to disk in mongos - !_extSortAllowed && // don't change behavior when testing external sort + !_allowDiskUse && // don't change behavior when testing external sort _sortedFiles.size() < 20) { // don't open too many FDs _sortedFiles.push_back(spill()); diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h index cf108c15f24..057eb58164a 100644 --- a/src/mongo/db/pipeline/document_source_group.h +++ b/src/mongo/db/pipeline/document_source_group.h @@ -69,6 +69,14 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + StageConstraints constraints() const final { + return {StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed}; + } + /** * Add an accumulator, which will become a field in each Document that results from grouping. */ @@ -176,7 +184,7 @@ private: // Only used when '_spilled' is true. std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator; - const bool _extSortAllowed; + const bool _allowDiskUse; std::pair<Value, Value> _firstPartOfNextGroup; // Only used when '_sorted' is true. diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp index 5cc5ba93be4..8e3fc90521b 100644 --- a/src/mongo/db/pipeline/document_source_group_test.cpp +++ b/src/mongo/db/pipeline/document_source_group_test.cpp @@ -103,7 +103,7 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) { // Allow the $group stage to spill to disk. TempDir tempDir("DocumentSourceGroupTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; VariablesParseState vps = expCtx->variablesParseState; diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index 83546654361..c5d4d7c2762 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -70,10 +70,13 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index d1967b6625f..839eebecc3f 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -53,9 +53,11 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } GetNextResult getNext() final; diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp index a49514cf628..276cf3005d8 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp @@ -47,7 +47,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create auto specObj = elem.embeddedObject(); - HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard; + HostTypeRequirement mergeType = HostTypeRequirement::kNone; for (auto&& elt : specObj) { if (elt.fieldNameStringData() == "mergeType"_sd) { @@ -62,7 +62,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create } else if ("primaryShard"_sd == mergeTypeString) { mergeType = HostTypeRequirement::kPrimaryShard; } else if ("mongos"_sd == mergeTypeString) { - mergeType = HostTypeRequirement::kAnyShardOrMongoS; + mergeType = HostTypeRequirement::kNone; } else { uasserted(ErrorCodes::BadValue, str::stream() << "unrecognized field while parsing mergeType: '" @@ -90,8 +90,8 @@ Value DocumentSourceInternalSplitPipeline::serialize( std::string mergeTypeString; switch (_mergeType) { - case HostTypeRequirement::kAnyShardOrMongoS: - mergeTypeString = "mongos"; + case HostTypeRequirement::kAnyShard: + mergeTypeString = "anyShard"; break; case HostTypeRequirement::kPrimaryShard: @@ -99,7 +99,7 @@ Value DocumentSourceInternalSplitPipeline::serialize( break; default: - mergeTypeString = "anyShard"; + mergeTypeString = "mongos"; break; } diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h index f9ac84c555f..811de4b7e23 100644 --- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -36,10 +36,10 @@ namespace mongo { * An internal stage available for testing. Acts as a simple passthrough of intermediate results * from the source stage, but forces the pipeline to split at the point where this stage appears * (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be - * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this - * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the - * pipeline will be sent to a random participating shard, subject to the requirements of any - * subsequent splittable stages in the pipeline. + * one of 'primaryShard', 'anyShard' or 'mongos' to control where the merge may occur. Omitting this + * parameter or specifying 'mongos' produces the default merging behaviour; the merge half of the + * pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a + * random participating shard otherwise. */ class DocumentSourceInternalSplitPipeline final : public DocumentSource, public SplittableDocumentSource { @@ -67,9 +67,11 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = _mergeType; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + _mergeType, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } GetNextResult getNext() final; @@ -80,7 +82,7 @@ private: : DocumentSource(expCtx), _mergeType(mergeType) {} Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; - HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard; + HostTypeRequirement _mergeType = HostTypeRequirement::kNone; }; } // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index 7bf47638b21..b74a053028f 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -48,6 +48,14 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; + } + GetNextResult getNext() final; const char* getSourceName() const final { return kStageName.rawData(); @@ -58,12 +66,6 @@ public: : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; - } - /** * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. */ diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h index 23e7ce97592..01321d596a6 100644 --- a/src/mongo/db/pipeline/document_source_list_local_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -94,13 +94,15 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.hostRequirement = StageConstraints::HostTypeRequirement::kAnyShardOrMongoS; - constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.isIndependentOfAnyCollection = true; constraints.allowedToForwardFromMongos = false; + constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h index 47c08af9d73..4d050239e4c 100644 --- a/src/mongo/db/pipeline/document_source_list_sessions.h +++ b/src/mongo/db/pipeline/document_source_list_sessions.h @@ -86,11 +86,11 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; - constraints.requiresInputDocSource = true; - constraints.isAllowedInsideFacetStage = false; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; } static boost::intrusive_ptr<DocumentSource> createFromBson( diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 21f3151f59c..3a8de609c9f 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -118,6 +118,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs, Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState), _variablesParseState.defineVariable(varName)); } + + initializeIntrospectionPipeline(); } std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse( @@ -659,16 +661,14 @@ void DocumentSourceLookUp::serializeToArray( DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const { if (wasConstructedWithPipelineSyntax()) { - // Copy all 'let' variables into the foreign pipeline's expression context. - copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); - - auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + // We will use the introspection pipeline which we prebuilt during construction. + invariant(_parsedIntrospectionPipeline); DepsTracker subDeps(deps->getMetadataAvailable()); // Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables // declared by this $lookup and variables declared externally. - for (auto&& source : pipeline->getSources()) { + for (auto&& source : _parsedIntrospectionPipeline->getSources()) { source->getDependencies(&subDeps); } diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 9727d0e92db..7cf136b0cf9 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -96,9 +96,22 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; + const bool mayUseDisk = wasConstructedWithPipelineSyntax() && + std::any_of(_parsedIntrospectionPipeline->getSources().begin(), + _parsedIntrospectionPipeline->getSources().end(), + [](const auto& source) { + return source->constraints().diskRequirement == + DiskUseRequirement::kWritesTmpData; + }); + + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kPrimaryShard, + mayUseDisk ? DiskUseRequirement::kWritesTmpData + : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + constraints.canSwapWithMatch = true; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } @@ -243,6 +256,16 @@ private: void resolveLetVariables(const Document& localDoc, Variables* variables); /** + * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup + * pipelines will be built recursively. + */ + void initializeIntrospectionPipeline() { + copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get()); + _parsedIntrospectionPipeline = + uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx)); + } + + /** * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a * cursor and/or cache source as appropriate. */ @@ -296,6 +319,9 @@ private: // The aggregation pipeline defined with the user request, prior to optimization and view // resolution. std::vector<BSONObj> _userPipeline; + // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective + // functions. If sub-$lookup stages are present, their pipelines are constructed recursively. + std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline; std::vector<LetVariable> _letVariables; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index c51653ddb8d..02cfa7435d4 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -62,9 +62,13 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.canSwapWithMatch = true; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index dcbc2b55814..03ea80c5d7d 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -162,7 +162,7 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) { << "pipeline" << BSON_ARRAY(BSON("$project" << BSON("hasX" << "$$var1")) - << BSON("$match" << BSON("$hasX" << true))) + << BSON("$match" << BSON("hasX" << true))) << "as" << "as")) .firstElement(), @@ -448,9 +448,9 @@ public: Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { - if (pipeline->popFrontStageWithName("$match") || - pipeline->popFrontStageWithName("$sort") || - pipeline->popFrontStageWithName("$project")) { + if (pipeline->popFrontWithCriteria("$match") || + pipeline->popFrontWithCriteria("$sort") || + pipeline->popFrontWithCriteria("$project")) { continue; } break; diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 6b2fc653d43..26d912928eb 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -51,9 +51,11 @@ public: const char* getSourceName() const override; StageConstraints constraints() const override { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } Value serialize( diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 63521e2c742..2e334d5e0ce 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -59,11 +59,13 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShard; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 22f7a1aa24d..6cc7f5aed69 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -50,10 +50,13 @@ public: boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; StageConstraints constraints() const override { - StageConstraints constraints; - constraints.requiredPosition = PositionRequirement::kFirst; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + constraints.requiresInputDocSource = false; - constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 6756cd4df2a..ec8c188c0b7 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -45,11 +45,11 @@ public: GetDepsReturn getDependencies(DepsTracker* deps) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; - constraints.isAllowedInsideFacetStage = false; - constraints.requiredPosition = PositionRequirement::kLast; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kLast, + HostTypeRequirement::kPrimaryShard, + DiskUseRequirement::kWritesPersistentData, + FacetRequirement::kNotAllowed}; } // Virtuals for SplittableDocumentSource diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index bedc434d61e..db1698fe776 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -41,9 +41,11 @@ public: boost::intrusive_ptr<DocumentSource> optimize() final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } /** diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index 07a85c77e33..d5a86d9c008 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -44,9 +44,11 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; + return {StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed}; } GetDepsReturn getDependencies(DepsTracker* deps) const final { diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h index 19b7106b03d..8d10664f6ff 100644 --- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h +++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h @@ -44,6 +44,14 @@ public: Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create( const boost::intrusive_ptr<ExpressionContext>& expCtx, long long size, diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h index af96ae8e2ab..1d2474235f2 100644 --- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h +++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h @@ -50,13 +50,14 @@ public: } StageConstraints constraints() const { - StageConstraints constraints; - - if (_cache->isServing()) { - constraints.requiredPosition = PositionRequirement::kFirst; - constraints.requiresInputDocSource = false; - } - + StageConstraints constraints(StreamType::kStreaming, + _cache->isServing() ? PositionRequirement::kFirst + : PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed); + + constraints.requiresInputDocSource = (_cache->isBuilding()); return constraints; } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 188e9864310..bdf6eac8d05 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -102,8 +102,11 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index fc87d7e1eaa..4e10f9ac852 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -50,18 +50,20 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; + } + GetNextResult getNext() final; const char* getSourceName() const final { return kStageName.rawData(); } - StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; - return constraints; - } - /** * Attempts to move a subsequent $limit before the skip, potentially allowing for forther * optimizations earlier in the pipeline. diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp index 97923ff8118..e32c7d418a6 100644 --- a/src/mongo/db/pipeline/document_source_sort.cpp +++ b/src/mongo/db/pipeline/document_source_sort.cpp @@ -304,7 +304,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const { opts.limit = limitSrc->getLimit(); opts.maxMemoryUsageBytes = _maxMemoryUsageBytes; - if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) { + if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) { opts.extSortAllowed = true; opts.tempDir = pExpCtx->tempDir; } diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 2494400eaae..c4ecc799a5f 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -63,14 +63,15 @@ public: } StageConstraints constraints() const final { - StageConstraints constraints; - // Can't swap with a $match if a limit has been absorbed, since in general match can't swap - // with limit. + StageConstraints constraints( + _mergingPresorted ? StreamType::kStreaming : StreamType::kBlocking, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData, + FacetRequirement::kAllowed); + + // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit. constraints.canSwapWithMatch = !limitSrc; - - // Can run on mongoS only if this stage is merging presorted streams. - constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS - : HostTypeRequirement::kAnyShard); return constraints; } @@ -104,6 +105,13 @@ public: uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes); /** + * Returns true if this $sort stage is merging presorted streams. + */ + bool mergingPresorted() const { + return _mergingPresorted; + } + + /** * Returns -1 for no limit. */ long long getLimit() const; diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp index 25362138a9c..2bc39427a37 100644 --- a/src/mongo/db/pipeline/document_source_sort_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_test.cpp @@ -402,7 +402,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) // Allow the $sort stage to spill to disk. unittest::TempDir tempDir("DocumentSourceSortTest"); expCtx->tempDir = tempDir.path(); - expCtx->extSortAllowed = true; + expCtx->allowDiskUse = true; const size_t maxMemoryUsageBytes = 1000; auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); @@ -436,7 +436,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled) TEST_F(DocumentSourceSortExecutionTest, ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); @@ -451,7 +451,7 @@ TEST_F(DocumentSourceSortExecutionTest, TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) { auto expCtx = getExpCtx(); - expCtx->extSortAllowed = false; + expCtx->allowDiskUse = false; const size_t maxMemoryUsageBytes = 1000; auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes); diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h index 826967a5c48..e20232457a1 100644 --- a/src/mongo/db/pipeline/document_source_tee_consumer.h +++ b/src/mongo/db/pipeline/document_source_tee_consumer.h @@ -53,6 +53,14 @@ public: size_t facetId, const boost::intrusive_ptr<TeeBuffer>& bufferSource); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; + } + GetNextResult getNext() final; /** diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index 5bc9a91afdb..c95a979dc56 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -47,8 +47,12 @@ public: GetModPathsReturn getModifiedPaths() const final; StageConstraints constraints() const final { - StageConstraints constraints; - constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 8f15fb5a3fe..db80c5f7234 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -46,7 +46,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx, : explain(request.getExplain()), fromMongos(request.isFromMongos()), needsMerge(request.needsMerge()), - extSortAllowed(request.shouldAllowDiskUse()), + allowDiskUse(request.shouldAllowDiskUse()), bypassDocumentValidation(request.shouldBypassDocumentValidation()), from34Mongos(request.isFrom34Mongos()), ns(request.getNamespaceString()), @@ -84,7 +84,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns, expCtx->fromMongos = fromMongos; expCtx->from34Mongos = from34Mongos; expCtx->inMongos = inMongos; - expCtx->extSortAllowed = extSortAllowed; + expCtx->allowDiskUse = allowDiskUse; expCtx->bypassDocumentValidation = bypassDocumentValidation; expCtx->tempDir = tempDir; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 4fe6bc8e541..d580644eeba 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -117,7 +117,7 @@ public: bool fromMongos = false; bool needsMerge = false; bool inMongos = false; - bool extSortAllowed = false; + bool allowDiskUse = false; bool bypassDocumentValidation = false; // We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 34e04912e49..631228a8ddb 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -64,6 +64,9 @@ namespace dps = ::mongo::dotted_path_support; using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; +using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement; +using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement; +using StreamType = DocumentSource::StageConstraints::StreamType; Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {} @@ -169,15 +172,14 @@ void Pipeline::validateFacetPipeline() const { } for (auto&& stage : _sources) { auto stageConstraints = stage->constraints(); - if (!stageConstraints.isAllowedInsideFacetStage) { + if (!stageConstraints.isAllowedInsideFacetStage()) { uasserted(40600, str::stream() << stage->getSourceName() << " is not allowed to be used within a $facet stage"); } // We expect a stage within a $facet stage to have these properties. - invariant(stageConstraints.requiresInputDocSource); - invariant(!stageConstraints.isIndependentOfAnyCollection); invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); + invariant(!stageConstraints.isIndependentOfAnyCollection); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -434,8 +436,18 @@ bool Pipeline::needsPrimaryShardMerger() const { } bool Pipeline::canRunOnMongos() const { - return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS; + return std::all_of(_sources.begin(), _sources.end(), [&](const auto& stage) { + auto constraints = stage->constraints(); + const bool doesNotNeedShard = (constraints.hostRequirement == HostTypeRequirement::kNone); + const bool doesNotNeedDisk = + (constraints.diskRequirement == DiskUseRequirement::kNoDiskUse || + (constraints.diskRequirement == DiskUseRequirement::kWritesTmpData && + !pCtx->allowDiskUse)); + const bool doesNotBlockOrBlockingIsPermitted = + (constraints.streamType == StreamType::kStreaming || + !internalQueryProhibitBlockingMergeOnMongoS.load()); + + return doesNotNeedShard && doesNotNeedDisk && doesNotBlockOrBlockingIsPermitted; }); } @@ -579,11 +591,17 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva return deps; } -boost::intrusive_ptr<DocumentSource> Pipeline::popFrontStageWithName(StringData targetStageName) { +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( + StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) { if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) { return nullptr; } auto targetStage = _sources.front(); + + if (predicate && !predicate(targetStage.get())) { + return nullptr; + } + _sources.pop_front(); stitch(); return targetStage; diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 0aa142c36c8..29321861ce8 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -38,6 +38,8 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/query/query_knobs.h" +#include "mongo/stdx/functional.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/timer.h" @@ -281,10 +283,13 @@ public: } /** - * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. Returns - * nullptr if there is no first stage, or if the stage's name is not 'targetStageName'. + * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the + * given 'predicate' function, if present, returns 'true' when called with a pointer to the + * stage. Returns nullptr if there is no first stage which meets these criteria. */ - boost::intrusive_ptr<DocumentSource> popFrontStageWithName(StringData targetStageName); + boost::intrusive_ptr<DocumentSource> popFrontWithCriteria( + StringData targetStageName, + stdx::function<bool(const DocumentSource* const)> predicate = nullptr); /** * PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index e84f30dca74..6c2c3fbba73 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -1791,7 +1791,7 @@ public: DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {} StageConstraints constraints() const final { - StageConstraints constraints; + auto constraints = DocumentSourceMock::constraints(); constraints.isIndependentOfAnyCollection = true; return constraints; } @@ -1906,7 +1906,12 @@ public: DocumentSourceDependencyDummy() : DocumentSourceMock({}) {} StageConstraints constraints() const final { - return StageConstraints{}; // Overrides DocumentSourceMock's required position. + // Overrides DocumentSourceMock's required position. + return {StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed}; } }; diff --git a/src/mongo/db/query/query_knobs.cpp b/src/mongo/db/query/query_knobs.cpp index 69936020c51..73f7e618769 100644 --- a/src/mongo/db/query/query_knobs.cpp +++ b/src/mongo/db/query/query_knobs.cpp @@ -79,4 +79,6 @@ MONGO_EXPORT_SERVER_PARAMETER(internalDocumentSourceLookupCacheSizeBytes, int, 1 MONGO_EXPORT_SERVER_PARAMETER(internalQueryPlannerGenerateCoveredWholeIndexScans, bool, false); MONGO_EXPORT_SERVER_PARAMETER(internalQueryIgnoreUnknownJSONSchemaKeywords, bool, false); + +MONGO_EXPORT_SERVER_PARAMETER(internalQueryProhibitBlockingMergeOnMongoS, bool, false); } // namespace mongo diff --git a/src/mongo/db/query/query_knobs.h b/src/mongo/db/query/query_knobs.h index 651d39c5c23..5b4d759b26a 100644 --- a/src/mongo/db/query/query_knobs.h +++ b/src/mongo/db/query/query_knobs.h @@ -122,4 +122,5 @@ extern AtomicInt32 internalDocumentSourceCursorBatchSizeBytes; extern AtomicInt32 internalDocumentSourceLookupCacheSizeBytes; +extern AtomicBool internalQueryProhibitBlockingMergeOnMongoS; } // namespace mongo diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index f3298662b02..a70227b99a4 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -622,7 +622,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, invariant(pipelineForMerging); // First, check whether we can merge on the mongoS. - if (pipelineForMerging->canRunOnMongos() && !internalQueryProhibitMergingOnMongoS.load()) { + if (!internalQueryProhibitMergingOnMongoS.load() && pipelineForMerging->canRunOnMongos()) { // Register the new mongoS cursor, and retrieve the initial batch of results. auto cursorResponse = establishMergingMongosCursor(opCtx, request, diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 7f86b1cadab..bccdc369705 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -153,7 +153,13 @@ namespace { * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. */ boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) { - if (auto frontSort = mergePipeline->popFrontStageWithName(DocumentSourceSort::kStageName)) { + // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort. + auto frontSort = mergePipeline->popFrontWithCriteria( + DocumentSourceSort::kStageName, [](const DocumentSource* const source) { + return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted(); + }); + + if (frontSort) { auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get()); if (auto sortLimit = sortStage->getLimitSrc()) { // There was a limit stage absorbed into the sort stage, so we need to preserve that. @@ -198,10 +204,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu // instead. while (!pipeline->getSources().empty()) { invariant(isSkipOrLimit(pipeline->getSources().front())); - if (auto skip = pipeline->popFrontStageWithName(DocumentSourceSkip::kStageName)) { + if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) { root = stdx::make_unique<RouterStageSkip>( opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip()); - } else if (auto limit = pipeline->popFrontStageWithName(DocumentSourceLimit::kStageName)) { + } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) { root = stdx::make_unique<RouterStageLimit>( opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit()); } diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h index 329f400dd7a..fef4b106513 100644 --- a/src/mongo/s/query/router_stage_pipeline.h +++ b/src/mongo/s/query/router_stage_pipeline.h @@ -68,6 +68,14 @@ private: const boost::intrusive_ptr<ExpressionContext>& expCtx, std::unique_ptr<RouterExecStage> childStage); + StageConstraints constraints() const final { + return {StreamType::kStreaming, + PositionRequirement::kFirst, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed}; + } + GetNextResult getNext() final; void doDispose() final; void reattachToOperationContext(OperationContext* opCtx) final; |