summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/mongos_merge.js335
-rw-r--r--src/mongo/db/pipeline/document_source.h69
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h8
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto_test.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h16
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h9
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h11
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp38
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h9
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h8
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_group.h10
-rw-r--r--src/mongo/db/pipeline/document_source_group_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h9
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h8
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h18
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h14
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h12
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.h10
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h30
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_match.h8
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h10
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h9
-rw-r--r--src/mongo/db/pipeline/document_source_out.h10
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h8
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h15
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h7
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h14
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h22
-rw-r--r--src/mongo/db/pipeline/document_source_sort_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h8
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h8
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.h2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp30
-rw-r--r--src/mongo/db/pipeline/pipeline.h11
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp9
-rw-r--r--src/mongo/db/query/query_knobs.cpp2
-rw-r--r--src/mongo/db/query/query_knobs.h1
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp12
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h8
54 files changed, 663 insertions, 286 deletions
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js
index fd49ef4c7e5..b8763a45545 100644
--- a/jstests/aggregation/mongos_merge.js
+++ b/jstests/aggregation/mongos_merge.js
@@ -1,19 +1,10 @@
/**
* Tests that split aggregations whose merge pipelines are eligible to run on mongoS do so, and
- * produce the expected results.
+ * produce the expected results. Stages which are eligible to merge on mongoS include:
*
- * Splittable stages whose merge components are eligible to run on mongoS include:
- * - $sort (iff merging pre-sorted streams)
- * - $skip
- * - $limit
- * - $sample
- *
- * Non-splittable stages such as those listed below are eligible to run in a mongoS merge pipeline:
- * - $match
- * - $project
- * - $addFields
- * - $unwind
- * - $redact
+ * - Splitpoints whose merge components are non-blocking, e.g. $skip, $limit, $sort, $sample.
+ * - Non-splittable streaming stages, e.g. $match, $project, $unwind.
+ * - Blocking stages in cases where 'allowDiskUse' is false, e.g. $group, $bucketAuto.
*
* Because wrapping these aggregations in a $facet stage will affect how the pipeline can be merged,
* and will therefore invalidate the results of the test cases below, we tag this test to prevent it
@@ -23,12 +14,14 @@
*/
(function() {
- load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions.
+ load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions.
+ load('jstests/libs/geo_near_random.js'); // For GeoNearRandomTest.
const st = new ShardingTest({shards: 2, mongos: 1, config: 1});
const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];
+ const unshardedColl = mongosDB[jsTestName() + "_unsharded"];
const shard0DB = primaryShardDB = st.shard0.getDB(jsTestName());
const shard1DB = st.shard1.getDB(jsTestName());
@@ -52,6 +45,9 @@
assert.commandWorked(
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+ // We will need to test $geoNear on this collection, so create a 2dsphere index.
+ assert.commandWorked(mongosColl.createIndex({geo: "2dsphere"}));
+
// Split the collection into 4 chunks: [MinKey, -100), [-100, 0), [0, 100), [100, MaxKey).
assert.commandWorked(
mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: -100}}));
@@ -66,29 +62,47 @@
assert.commandWorked(mongosDB.adminCommand(
{moveChunk: mongosColl.getFullName(), find: {_id: 150}, to: "shard0001"}));
+ // Create a random geo co-ord generator for testing.
+ var georng = new GeoNearRandomTest(mongosColl);
+
// Write 400 documents across the 4 chunks.
for (let i = -200; i < 200; i++) {
- assert.writeOK(mongosColl.insert({_id: i, a: [i], b: {redactThisDoc: true}, c: true}));
+ assert.writeOK(mongosColl.insert(
+ {_id: i, a: [i], b: {redactThisDoc: true}, c: true, geo: georng.mkPt()}));
+ assert.writeOK(unshardedColl.insert({_id: i, x: i}));
}
+ let testNameHistory = new Set();
+
/**
* Runs the aggregation specified by 'pipeline', verifying that:
* - The number of documents returned by the aggregation matches 'expectedCount'.
* - The merge was performed on a mongoS if 'mergeType' is 'mongos', and on a shard otherwise.
*/
- function assertMergeBehaviour({testName, pipeline, mergeType, batchSize, expectedCount}) {
- // Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our
- // expectation.
+ function assertMergeBehaviour(
+ {testName, pipeline, mergeType, batchSize, allowDiskUse, expectedCount}) {
+ // Ensure that this test has a unique name.
+ assert(!testNameHistory.has(testName));
+ testNameHistory.add(testName);
+
+ // Create the aggregation options from the given arguments.
+ const opts = {
+ comment: testName,
+ cursor: (batchSize ? {batchSize: batchSize} : {}),
+ };
+
+ if (allowDiskUse !== undefined) {
+ opts.allowDiskUse = allowDiskUse;
+ }
+
+ // Verify that the explain() output's 'mergeType' field matches our expectation.
assert.eq(
- assert.commandWorked(mongosColl.explain().aggregate(pipeline, {comment: testName}))
+ assert.commandWorked(mongosColl.explain().aggregate(pipeline, Object.extend({}, opts)))
.mergeType,
mergeType);
- assert.eq(
- mongosColl
- .aggregate(pipeline, {comment: testName, cursor: {batchSize: (batchSize || 101)}})
- .itcount(),
- expectedCount);
+ // Verify that the aggregation returns the expected number of results.
+ assert.eq(mongosColl.aggregate(pipeline, opts).itcount(), expectedCount);
// Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeType' is not
// 'mongos', and that no such aggregation ran otherwise.
@@ -107,12 +121,13 @@
* Throws an assertion if the aggregation specified by 'pipeline' does not produce
* 'expectedCount' results, or if the merge phase is not performed on the mongoS.
*/
- function assertMergeOnMongoS({testName, pipeline, batchSize, expectedCount}) {
+ function assertMergeOnMongoS({testName, pipeline, batchSize, allowDiskUse, expectedCount}) {
assertMergeBehaviour({
testName: testName,
pipeline: pipeline,
mergeType: "mongos",
- batchSize: (batchSize || 101),
+ batchSize: (batchSize || 10),
+ allowDiskUse: allowDiskUse,
expectedCount: expectedCount
});
}
@@ -121,94 +136,214 @@
* Throws an assertion if the aggregation specified by 'pipeline' does not produce
* 'expectedCount' results, or if the merge phase was not performed on a shard.
*/
- function assertMergeOnMongoD({testName, pipeline, mergeType, batchSize, expectedCount}) {
+ function assertMergeOnMongoD(
+ {testName, pipeline, mergeType, batchSize, allowDiskUse, expectedCount}) {
assertMergeBehaviour({
testName: testName,
pipeline: pipeline,
mergeType: (mergeType || "anyShard"),
- batchSize: (batchSize || 101),
+ batchSize: (batchSize || 10),
+ allowDiskUse: allowDiskUse,
expectedCount: expectedCount
});
}
- //
- // Test cases.
- //
+ /**
+ * Runs a series of test cases which will consistently merge on mongoS or mongoD regardless of
+ * whether 'allowDiskUse' is true, false or omitted.
+ */
+ function runTestCasesWhoseMergeLocationIsConsistentRegardlessOfAllowDiskUse(allowDiskUse) {
+ // Test that a $match pipeline with an empty merge stage is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_match_only",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
- let testName;
+ // Test that a $sort stage which merges pre-sorted streams is run on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_sort_presorted",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
- // Test that a $match pipeline with an empty merge stage is merged on mongoS.
- assertMergeOnMongoS({
- testName: "agg_mongos_merge_match_only",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}],
- batchSize: 10,
- expectedCount: 400
- });
+ // Test that $skip is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_skip",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 100
+ });
- // Test that a $sort stage which merges pre-sorted streams is run on mongoS.
- assertMergeOnMongoS({
- testName: "agg_mongos_merge_sort_presorted",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}],
- batchSize: 10,
- expectedCount: 400
- });
+ // Test that $limit is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_limit",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 300
+ });
- // Test that a $sort stage which must sort the dataset from scratch is NOT run on mongoS.
- assertMergeOnMongoD({
- testName: "agg_mongos_merge_sort_in_mem",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}],
- batchSize: 10,
- expectedCount: 400
- });
+ // Test that $sample is merged on mongoS if it is the splitpoint, since this will result in
+ // a merging $sort of presorted streams in the merge pipeline.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_sample_splitpoint",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 300
+ });
- // Test that a merge pipeline which needs to run on the primary shard is NOT merged on mongoS.
- assertMergeOnMongoD({
- testName: "agg_mongos_merge_primary_shard",
- pipeline: [
- {$match: {_id: {$gte: -200, $lte: 200}}},
- {$_internalSplitPipeline: {mergeType: "primaryShard"}}
- ],
- mergeType: "primaryShard",
- batchSize: 10,
- expectedCount: 400
- });
+ // Test that $geoNear is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_geo_near",
+ pipeline: [
+ {$geoNear: {near: [0, 0], distanceField: "distance", spherical: true, limit: 300}}
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 300
+ });
- // Test that $skip is merged on mongoS.
- assertMergeOnMongoS({
- testName: "agg_mongos_merge_skip",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 300}],
- batchSize: 10,
- expectedCount: 100
- });
+ // Test that a pipeline whose merging half can be run on mongos using only the mongos
+ // execution machinery returns the correct results.
+ // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages",
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {$sort: {_id: -1}},
+ {$skip: 150},
+ {$limit: 150},
+ {$skip: 5},
+ {$limit: 1},
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 1
+ });
- // Test that $limit is merged on mongoS.
- assertMergeOnMongoS({
- testName: "agg_mongos_merge_limit",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 300}],
- batchSize: 10,
- expectedCount: 300
- });
+ // Test that a merge pipeline which needs to run on a shard is NOT merged on mongoS
+ // regardless of 'allowDiskUse'.
+ assertMergeOnMongoD({
+ testName: "agg_mongos_merge_primary_shard_disk_use_" + allowDiskUse,
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {$_internalSplitPipeline: {mergeType: "anyShard"}}
+ ],
+ mergeType: "anyShard",
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
+ }
- // Test that $sample is merged on mongoS.
- assertMergeOnMongoS({
- testName: "agg_mongos_merge_sample",
- pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 300}}],
- batchSize: 10,
- expectedCount: 300
- });
+ /**
+ * Runs a series of test cases which will always merge on mongoD when 'allowDiskUse' is true,
+ * and on mongoS when 'allowDiskUse' is false or omitted.
+ */
+ function runTestCasesWhoseMergeLocationDependsOnAllowDiskUse(allowDiskUse) {
+ // All test cases should merge on mongoD if allowDiskUse is true, mongoS otherwise.
+ const assertMergeOnMongoX = (allowDiskUse ? assertMergeOnMongoD : assertMergeOnMongoS);
+
+ // Test that a blocking $sort is only merged on mongoS if 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_blocking_sort_no_disk_use",
+ pipeline:
+ [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
+
+ // Test that $group is only merged on mongoS if 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_group_allow_disk_use",
+ pipeline:
+ [{$match: {_id: {$gte: -200, $lte: 200}}}, {$group: {_id: {$mod: ["$_id", 150]}}}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 299
+ });
+
+ // Test that a blocking $sample is only merged on mongoS if 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_blocking_sample_allow_disk_use",
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {$sample: {size: 300}},
+ {$sample: {size: 200}}
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 200
+ });
+
+ // Test that $bucketAuto is only merged on mongoS if 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_bucket_auto_allow_disk_use",
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {$bucketAuto: {groupBy: "$_id", buckets: 10}}
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 10
+ });
+
+ //
+ // Test composite stages.
+ //
+
+ // Test that $bucket ($group->$sort) is merged on mongoS iff 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_bucket_allow_disk_use",
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $bucket: {
+ groupBy: "$_id",
+ boundaries: [-200, -150, -100, -50, 0, 50, 100, 150, 200]
+ }
+ }
+ ],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 8
+ });
+
+ // Test that $sortByCount ($group->$sort) is merged on mongoS iff 'allowDiskUse' isn't set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_sort_by_count_allow_disk_use",
+ pipeline:
+ [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sortByCount: {$mod: ["$_id", 150]}}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 299
+ });
+
+ // Test that $count ($group->$project) is merged on mongoS iff 'allowDiskUse' is not set.
+ assertMergeOnMongoX({
+ testName: "agg_mongos_merge_count_allow_disk_use",
+ pipeline: [{$match: {_id: {$gte: -150, $lte: 1500}}}, {$count: "doc_count"}],
+ allowDiskUse: allowDiskUse,
+ expectedCount: 1
+ });
+ }
+
+ // Run all test cases for each potential value of 'allowDiskUse'.
+ for (let allowDiskUse of[false, undefined, true]) {
+ runTestCasesWhoseMergeLocationIsConsistentRegardlessOfAllowDiskUse(allowDiskUse);
+ runTestCasesWhoseMergeLocationDependsOnAllowDiskUse(allowDiskUse);
+ testNameHistory.clear();
+ }
// Test that merge pipelines containing all mongos-runnable stages produce the expected output.
assertMergeOnMongoS({
testName: "agg_mongos_merge_all_mongos_runnable_stages",
pipeline: [
- {$match: {_id: {$gte: -200, $lte: 200}}},
- {$sort: {_id: 1}},
+ {$geoNear: {near: [0, 0], distanceField: "distance", spherical: true, limit: 400}},
+ {$sort: {a: 1}},
{$skip: 150},
{$limit: 150},
{$addFields: {d: true}},
{$unwind: "$a"},
{$sample: {size: 100}},
- {$project: {c: 0}},
+ {$project: {c: 0, geo: 0, distance: 0}},
+ {$group: {_id: "$_id", doc: {$push: "$$CURRENT"}}},
+ {$unwind: "$doc"},
+ {$replaceRoot: {newRoot: "$doc"}},
{
$redact: {
$cond:
@@ -218,33 +353,17 @@
{
$match: {
_id: {$gte: -50, $lte: 100},
- a: {$gte: -50, $lte: 100},
+ a: {$type: "number", $gte: -50, $lte: 100},
b: {$exists: false},
c: {$exists: false},
- d: true
+ d: true,
+ geo: {$exists: false},
+ distance: {$exists: false}
}
}
],
- batchSize: 10,
expectedCount: 100
});
- // Test that a pipeline whose merging half can be run on mongos using only the mongos execution
- // machinery returns the correct results.
- assertMergeOnMongoS({
- testName: "agg_mongos_merge_all_mongos_runnable_skip_and_limit_stages",
- pipeline: [
- {$match: {_id: {$gte: -5, $lte: 100}}},
- {$sort: {_id: -1}},
- {$skip: 5},
- {$limit: 10},
- {$skip: 5},
- {$limit: 1},
- ],
- batchSize: 10,
- expectedCount: 1
- });
- // TODO SERVER-30882 Find a way to assert that all stages get absorbed by mongos.
-
st.stop();
})();
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index eba4e2acc4b..934d9d29da2 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -132,20 +132,70 @@ public:
* A HostTypeRequirement defines where this stage is permitted to be executed when the
* pipeline is run on a sharded cluster.
*/
- enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS };
+ enum class HostTypeRequirement { kNone, kPrimaryShard, kAnyShard };
- // Set if this stage needs to be in a particular position of the pipeline.
- PositionRequirement requiredPosition = PositionRequirement::kNone;
+ /**
+ * A DiskUseRequirement indicates whether this stage writes to disk, or whether it may spill
+ * to disk if its memory usage exceeds a given threshold. Note that this only indicates the
+ * *ability* of the stage to spill; if 'allowDiskUse' is set to false, it will be prevented
+ * from doing so.
+ */
+ enum class DiskUseRequirement { kNoDiskUse, kWritesTmpData, kWritesPersistentData };
+
+ /**
+ * A FacetRequirement indicates whether this stage may be used within a $facet pipeline.
+ */
+ enum class FacetRequirement { kAllowed, kNotAllowed };
- // Set if this stage can only be executed on specific components of a sharded cluster.
- HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard;
+ /**
+ * A StreamType defines whether this stage is streaming (can produce output based solely on
+ * the current input document) or blocking (must examine subsequent documents before
+ * producing an output document).
+ */
+ enum class StreamType { kStreaming, kBlocking };
+
+ StageConstraints(StreamType streamType,
+ PositionRequirement requiredPosition,
+ HostTypeRequirement hostRequirement,
+ DiskUseRequirement diskRequirement,
+ FacetRequirement facetRequirement)
+ : requiredPosition(requiredPosition),
+ hostRequirement(hostRequirement),
+ diskRequirement(diskRequirement),
+ facetRequirement(facetRequirement),
+ streamType(streamType) {
+ // Stages which are allowed to run in $facet pipelines must not have any specific
+ // position requirements.
+ invariant(!isAllowedInsideFacetStage() ||
+ requiredPosition == PositionRequirement::kNone);
+ }
- bool isAllowedInsideFacetStage = true;
+ // Indicates whether this stage needs to be at a particular position in the pipeline.
+ const PositionRequirement requiredPosition;
+
+ // Indicates whether this stage can only be executed on specific components of a sharded
+ // cluster.
+ const HostTypeRequirement hostRequirement;
+
+ // Indicates whether this stage may write persistent data to disk, or may spill to temporary
+ // files if its memory usage becomes excessive.
+ const DiskUseRequirement diskRequirement;
+
+ // Indicates whether this stage may run inside a $facet stage.
+ const FacetRequirement facetRequirement;
+
+ // Indicates whether this is a streaming or blocking stage.
+ const StreamType streamType;
// True if this stage does not generate results itself, and instead pulls inputs from an
// input DocumentSource (via 'pSource').
bool requiresInputDocSource = true;
+ // True if this stage should be permitted to run in a $facet pipeline.
+ bool isAllowedInsideFacetStage() const {
+ return facetRequirement == FacetRequirement::kAllowed;
+ }
+
// True if this stage operates on a global or database level, like $currentOp.
bool isIndependentOfAnyCollection = false;
@@ -165,6 +215,9 @@ public:
using HostTypeRequirement = StageConstraints::HostTypeRequirement;
using PositionRequirement = StageConstraints::PositionRequirement;
+ using DiskUseRequirement = StageConstraints::DiskUseRequirement;
+ using FacetRequirement = StageConstraints::FacetRequirement;
+ using StreamType = StageConstraints::StreamType;
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
@@ -260,9 +313,7 @@ public:
* Returns a struct containing information about any special constraints imposed on using this
* stage.
*/
- virtual StageConstraints constraints() const {
- return StageConstraints{};
- }
+ virtual StageConstraints constraints() const = 0;
/**
* Informs the stage that it is no longer needed and can release its resources. After dispose()
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.cpp b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
index e874fd5d426..a8123f5f2fc 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.cpp
@@ -91,7 +91,7 @@ DocumentSource::GetNextResult DocumentSourceBucketAuto::populateSorter() {
if (!_sorter) {
SortOptions opts;
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) {
+ if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index 2a40b092c1f..60c5ed02501 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -48,6 +48,14 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed};
+ }
+
/**
* The $bucketAuto stage must be run on the merging shard.
*/
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
index 08926254480..df19b4eb497 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_auto_test.cpp
@@ -350,7 +350,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToCorrectlySpillToDisk) {
auto expCtx = getExpCtx();
unittest::TempDir tempDir("DocumentSourceBucketAutoTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
@@ -386,7 +386,7 @@ TEST_F(BucketAutoTests, ShouldBeAbleToPauseLoadingWhileSpilled) {
// Allow the $sort stage to spill to disk.
unittest::TempDir tempDir("DocumentSourceBucketAutoTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
@@ -647,22 +647,22 @@ void assertCannotSpillToDisk(const boost::intrusive_ptr<ExpressionContext>& expC
TEST_F(BucketAutoTests, ShouldFailIfBufferingTooManyDocuments) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
expCtx->inMongos = false;
assertCannotSpillToDisk(expCtx);
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
expCtx->inMongos = true;
assertCannotSpillToDisk(expCtx);
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
expCtx->inMongos = true;
assertCannotSpillToDisk(expCtx);
}
TEST_F(BucketAutoTests, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a38a1aef2ce..40ceaa5b1b7 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -97,10 +97,11 @@ const char* DocumentSourceOplogMatch::getSourceName() const {
}
DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints() const {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
}
/**
@@ -142,6 +143,14 @@ public:
return "$changeStream";
}
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
// This stage is created by the DocumentSourceChangeStream stage, so serializing it
// here would result in it being created twice.
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index 22441f2a03a..cfbef21088d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -60,6 +60,14 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
static boost::intrusive_ptr<DocumentSourceShardCheckResumability> create(
@@ -87,6 +95,14 @@ public:
GetNextResult getNext() final;
const char* getSourceName() const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
/**
* SplittableDocumentSource methods; this has to run on the merger, since the resume point could
* be at any shard.
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 2903241d1a7..0c319729328 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -75,10 +75,13 @@ public:
const char* getSourceName() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index d5138e74d2a..6fa869712b1 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -79,11 +79,14 @@ public:
const char* getSourceName() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.isIndependentOfAnyCollection = true;
+ constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index f9c9345baf6..674bbaa85ee 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -52,8 +52,12 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index c7f82678eb1..6c58124387b 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -238,21 +238,29 @@ void DocumentSourceFacet::doReattachToOperationContext(OperationContext* opCtx)
}
DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
- StageConstraints constraints;
- constraints.isAllowedInsideFacetStage = false; // Disallow nested $facets.
-
- for (auto&& facet : _facets) {
- for (auto&& nestedStage : facet.pipeline->getSources()) {
- if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) {
- // Currently we don't split $facet to have a merger part and a shards part (see
- // SERVER-24154). This means that if any stage in any of the $facet pipelines
- // requires the primary shard, then the entire $facet must happen on the merger, and
- // the merger must be the primary shard.
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
- }
- }
- }
- return constraints;
+ const bool mayUseDisk = std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
+ const auto sources = facet.pipeline->getSources();
+ return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
+ return source->constraints().diskRequirement == DiskUseRequirement::kWritesTmpData;
+ });
+ });
+
+ // Currently we don't split $facet to have a merger part and a shards part (see SERVER-24154).
+ // This means that if any stage in any of the $facet pipelines requires the primary shard, then
+ // the entire $facet must happen on the merger, and the merger must be the primary shard.
+ const bool needsPrimaryShard =
+ std::any_of(_facets.begin(), _facets.end(), [&](const auto& facet) {
+ const auto sources = facet.pipeline->getSources();
+ return std::any_of(sources.begin(), sources.end(), [&](const auto source) {
+ return source->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
+ });
+ });
+
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ needsPrimaryShard ? HostTypeRequirement::kPrimaryShard : HostTypeRequirement::kAnyShard,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index be05268fb7b..61d112906c7 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -186,9 +186,11 @@ public:
DocumentSourcePassthrough() : DocumentSourceMock({}) {}
StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.isAllowedInsideFacetStage = true;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
DocumentSource::GetNextResult getNext() final {
@@ -625,9 +627,11 @@ TEST_F(DocumentSourceFacetTest, ShouldThrowIfAnyPipelineRequiresTextScoreButItIs
class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
public:
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.cpp b/src/mongo/db/pipeline/document_source_geo_near.cpp
index c971509e373..f7b7a5ca741 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.cpp
+++ b/src/mongo/db/pipeline/document_source_geo_near.cpp
@@ -61,7 +61,7 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() {
if (!resultsIterator->more())
return GetNextResult::makeEOF();
- // each result from the geoNear command is wrapped in a wrapper object with "obj",
+ // Each result from the geoNear command is wrapped in a wrapper object with "obj",
// "dis" and maybe "loc" fields. We want to take the object from "obj" and inject the
// other fields into it.
Document result(resultsIterator->next().embeddedObject());
@@ -70,6 +70,11 @@ DocumentSource::GetNextResult DocumentSourceGeoNear::getNext() {
if (includeLocs)
output.setNestedField(*includeLocs, result["loc"]);
+ // In a cluster, $geoNear output will be merged via $sort, so add the sort key.
+ if (pExpCtx->needsMerge) {
+ output.setSortKeyMetaField(BSON("" << result["dis"]));
+ }
+
return output.freeze();
}
@@ -89,12 +94,13 @@ Pipeline::SourceContainer::iterator DocumentSourceGeoNear::doOptimizeAt(
}
// This command is sent as-is to the shards.
-// On router this becomes a sort by distance (nearest-first) with limit.
intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getShardSource() {
return this;
}
+// On mongoS this becomes a merge sort by distance (nearest-first) with limit.
intrusive_ptr<DocumentSource> DocumentSourceGeoNear::getMergeSource() {
- return DocumentSourceSort::create(pExpCtx, BSON(distanceField->fullPath() << 1), limit);
+ return DocumentSourceSort::create(
+ pExpCtx, BSON(distanceField->fullPath() << 1 << "$mergePresorted" << true), limit);
}
Value DocumentSourceGeoNear::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 315dcea6dd4..23e7c4e901c 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -48,10 +48,13 @@ public:
Pipeline::SourceContainer* container) final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 53394d3d0ab..afd633d46bc 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -54,9 +54,13 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
constraints.canSwapWithMatch = true;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index ba2c48680f2..41b2546e331 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -266,7 +266,7 @@ DocumentSourceGroup::DocumentSourceGroup(const intrusive_ptr<ExpressionContext>&
_initialized(false),
_groups(pExpCtx->getValueComparator().makeUnorderedValueMap<Accumulators>()),
_spilled(false),
- _extSortAllowed(pExpCtx->extSortAllowed && !pExpCtx->inMongos) {}
+ _allowDiskUse(pExpCtx->allowDiskUse && !pExpCtx->inMongos) {}
void DocumentSourceGroup::addAccumulator(AccumulationStatement accumulationStatement) {
_accumulatedFields.push_back(accumulationStatement);
@@ -485,7 +485,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
uassert(16945,
"Exceeded memory limit for $group, but didn't allow external sort."
" Pass allowDiskUse:true to opt in.",
- _extSortAllowed);
+ _allowDiskUse);
_sortedFiles.push_back(spill());
_memoryUsageBytes = 0;
}
@@ -531,7 +531,7 @@ DocumentSource::GetNextResult DocumentSourceGroup::initialize() {
// In debug mode, spill every time we have a duplicate id to stress merge logic.
if (!inserted && // is a dup
!pExpCtx->inMongos && // can't spill to disk in mongos
- !_extSortAllowed && // don't change behavior when testing external sort
+ !_allowDiskUse && // don't change behavior when testing external sort
_sortedFiles.size() < 20) { // don't open too many FDs
_sortedFiles.push_back(spill());
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index cf108c15f24..057eb58164a 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -69,6 +69,14 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ StageConstraints constraints() const final {
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed};
+ }
+
/**
* Add an accumulator, which will become a field in each Document that results from grouping.
*/
@@ -176,7 +184,7 @@ private:
// Only used when '_spilled' is true.
std::unique_ptr<Sorter<Value, Value>::Iterator> _sorterIterator;
- const bool _extSortAllowed;
+ const bool _allowDiskUse;
std::pair<Value, Value> _firstPartOfNextGroup;
// Only used when '_sorted' is true.
diff --git a/src/mongo/db/pipeline/document_source_group_test.cpp b/src/mongo/db/pipeline/document_source_group_test.cpp
index 5cc5ba93be4..8e3fc90521b 100644
--- a/src/mongo/db/pipeline/document_source_group_test.cpp
+++ b/src/mongo/db/pipeline/document_source_group_test.cpp
@@ -103,7 +103,7 @@ TEST_F(DocumentSourceGroupTest, ShouldBeAbleToPauseLoadingWhileSpilled) {
// Allow the $group stage to spill to disk.
TempDir tempDir("DocumentSourceGroupTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
VariablesParseState vps = expCtx->variablesParseState;
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index 83546654361..c5d4d7c2762 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -70,10 +70,13 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index d1967b6625f..839eebecc3f 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -53,9 +53,11 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
index a49514cf628..276cf3005d8 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
@@ -47,7 +47,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create
auto specObj = elem.embeddedObject();
- HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard;
+ HostTypeRequirement mergeType = HostTypeRequirement::kNone;
for (auto&& elt : specObj) {
if (elt.fieldNameStringData() == "mergeType"_sd) {
@@ -62,7 +62,7 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::create
} else if ("primaryShard"_sd == mergeTypeString) {
mergeType = HostTypeRequirement::kPrimaryShard;
} else if ("mongos"_sd == mergeTypeString) {
- mergeType = HostTypeRequirement::kAnyShardOrMongoS;
+ mergeType = HostTypeRequirement::kNone;
} else {
uasserted(ErrorCodes::BadValue,
str::stream() << "unrecognized field while parsing mergeType: '"
@@ -90,8 +90,8 @@ Value DocumentSourceInternalSplitPipeline::serialize(
std::string mergeTypeString;
switch (_mergeType) {
- case HostTypeRequirement::kAnyShardOrMongoS:
- mergeTypeString = "mongos";
+ case HostTypeRequirement::kAnyShard:
+ mergeTypeString = "anyShard";
break;
case HostTypeRequirement::kPrimaryShard:
@@ -99,7 +99,7 @@ Value DocumentSourceInternalSplitPipeline::serialize(
break;
default:
- mergeTypeString = "anyShard";
+ mergeTypeString = "mongos";
break;
}
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index f9ac84c555f..811de4b7e23 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -36,10 +36,10 @@ namespace mongo {
* An internal stage available for testing. Acts as a simple passthrough of intermediate results
* from the source stage, but forces the pipeline to split at the point where this stage appears
* (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be
- * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this
- * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the
- * pipeline will be sent to a random participating shard, subject to the requirements of any
- * subsequent splittable stages in the pipeline.
+ * one of 'primaryShard', 'anyShard' or 'mongos' to control where the merge may occur. Omitting this
+ * parameter or specifying 'mongos' produces the default merging behaviour; the merge half of the
+ * pipeline will be executed on mongoS if all other stages are eligible, and will be sent to a
+ * random participating shard otherwise.
*/
class DocumentSourceInternalSplitPipeline final : public DocumentSource,
public SplittableDocumentSource {
@@ -67,9 +67,11 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = _mergeType;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ _mergeType,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
GetNextResult getNext() final;
@@ -80,7 +82,7 @@ private:
: DocumentSource(expCtx), _mergeType(mergeType) {}
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
- HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard;
+ HostTypeRequirement _mergeType = HostTypeRequirement::kNone;
};
} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index 7bf47638b21..b74a053028f 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -48,6 +48,14 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
+ }
+
GetNextResult getNext() final;
const char* getSourceName() const final {
return kStageName.rawData();
@@ -58,12 +66,6 @@ public:
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
- }
-
/**
* Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
*/
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 23e7ce97592..01321d596a6 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -94,13 +94,15 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.hostRequirement = StageConstraints::HostTypeRequirement::kAnyShardOrMongoS;
- constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.isIndependentOfAnyCollection = true;
constraints.allowedToForwardFromMongos = false;
+ constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h
index 47c08af9d73..4d050239e4c 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_sessions.h
@@ -86,11 +86,11 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
- constraints.requiresInputDocSource = true;
- constraints.isAllowedInsideFacetStage = false;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 21f3151f59c..3a8de609c9f 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -118,6 +118,8 @@ DocumentSourceLookUp::DocumentSourceLookUp(NamespaceString fromNs,
Expression::parseOperand(pExpCtx, varElem, pExpCtx->variablesParseState),
_variablesParseState.defineVariable(varName));
}
+
+ initializeIntrospectionPipeline();
}
std::unique_ptr<DocumentSourceLookUp::LiteParsed> DocumentSourceLookUp::LiteParsed::parse(
@@ -659,16 +661,14 @@ void DocumentSourceLookUp::serializeToArray(
DocumentSource::GetDepsReturn DocumentSourceLookUp::getDependencies(DepsTracker* deps) const {
if (wasConstructedWithPipelineSyntax()) {
- // Copy all 'let' variables into the foreign pipeline's expression context.
- copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
-
- auto pipeline = uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+ // We will use the introspection pipeline which we prebuilt during construction.
+ invariant(_parsedIntrospectionPipeline);
DepsTracker subDeps(deps->getMetadataAvailable());
// Get the subpipeline dependencies. Subpipeline stages may reference both 'let' variables
// declared by this $lookup and variables declared externally.
- for (auto&& source : pipeline->getSources()) {
+ for (auto&& source : _parsedIntrospectionPipeline->getSources()) {
source->getDependencies(&subDeps);
}
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 9727d0e92db..7cf136b0cf9 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -96,9 +96,22 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
+ const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
+ std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
+ _parsedIntrospectionPipeline->getSources().end(),
+ [](const auto& source) {
+ return source->constraints().diskRequirement ==
+ DiskUseRequirement::kWritesTmpData;
+ });
+
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kPrimaryShard,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData
+ : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
constraints.canSwapWithMatch = true;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
@@ -243,6 +256,16 @@ private:
void resolveLetVariables(const Document& localDoc, Variables* variables);
/**
+ * Builds a parsed pipeline for introspection (e.g. constraints, dependencies). Any sub-$lookup
+ * pipelines will be built recursively.
+ */
+ void initializeIntrospectionPipeline() {
+ copyVariablesToExpCtx(_variables, _variablesParseState, _fromExpCtx.get());
+ _parsedIntrospectionPipeline =
+ uassertStatusOK(Pipeline::parse(_resolvedPipeline, _fromExpCtx));
+ }
+
+ /**
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
* cursor and/or cache source as appropriate.
*/
@@ -296,6 +319,9 @@ private:
// The aggregation pipeline defined with the user request, prior to optimization and view
// resolution.
std::vector<BSONObj> _userPipeline;
+ // A pipeline parsed from _resolvedPipeline at creation time, intended to support introspective
+ // functions. If sub-$lookup stages are present, their pipelines are constructed recursively.
+ std::unique_ptr<Pipeline, Pipeline::Deleter> _parsedIntrospectionPipeline;
std::vector<LetVariable> _letVariables;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index c51653ddb8d..02cfa7435d4 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -62,9 +62,13 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.canSwapWithMatch = true;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index dcbc2b55814..03ea80c5d7d 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -162,7 +162,7 @@ TEST_F(DocumentSourceLookUpTest, AcceptsPipelineWithLetSyntax) {
<< "pipeline"
<< BSON_ARRAY(BSON("$project" << BSON("hasX"
<< "$$var1"))
- << BSON("$match" << BSON("$hasX" << true)))
+ << BSON("$match" << BSON("hasX" << true)))
<< "as"
<< "as"))
.firstElement(),
@@ -448,9 +448,9 @@ public:
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
- if (pipeline->popFrontStageWithName("$match") ||
- pipeline->popFrontStageWithName("$sort") ||
- pipeline->popFrontStageWithName("$project")) {
+ if (pipeline->popFrontWithCriteria("$match") ||
+ pipeline->popFrontWithCriteria("$sort") ||
+ pipeline->popFrontWithCriteria("$project")) {
continue;
}
break;
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 6b2fc653d43..26d912928eb 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -51,9 +51,11 @@ public:
const char* getSourceName() const override;
StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
Value serialize(
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 63521e2c742..2e334d5e0ce 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -59,11 +59,13 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShard;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 22f7a1aa24d..6cc7f5aed69 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -50,10 +50,13 @@ public:
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
StageConstraints constraints() const override {
- StageConstraints constraints;
- constraints.requiredPosition = PositionRequirement::kFirst;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
constraints.requiresInputDocSource = false;
- constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 6756cd4df2a..ec8c188c0b7 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -45,11 +45,11 @@ public:
GetDepsReturn getDependencies(DepsTracker* deps) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
- constraints.isAllowedInsideFacetStage = false;
- constraints.requiredPosition = PositionRequirement::kLast;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kLast,
+ HostTypeRequirement::kPrimaryShard,
+ DiskUseRequirement::kWritesPersistentData,
+ FacetRequirement::kNotAllowed};
}
// Virtuals for SplittableDocumentSource
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index bedc434d61e..db1698fe776 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -41,9 +41,11 @@ public:
boost::intrusive_ptr<DocumentSource> optimize() final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index 07a85c77e33..d5a86d9c008 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -44,9 +44,11 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
+ return {StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed};
}
GetDepsReturn getDependencies(DepsTracker* deps) const final {
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
index 19b7106b03d..8d10664f6ff 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -44,6 +44,14 @@ public:
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
GetDepsReturn getDependencies(DepsTracker* deps) const final;
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kAnyShard,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
long long size,
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index af96ae8e2ab..1d2474235f2 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -50,13 +50,14 @@ public:
}
StageConstraints constraints() const {
- StageConstraints constraints;
-
- if (_cache->isServing()) {
- constraints.requiredPosition = PositionRequirement::kFirst;
- constraints.requiresInputDocSource = false;
- }
-
+ StageConstraints constraints(StreamType::kStreaming,
+ _cache->isServing() ? PositionRequirement::kFirst
+ : PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed);
+
+ constraints.requiresInputDocSource = (_cache->isBuilding());
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 188e9864310..bdf6eac8d05 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -102,8 +102,11 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index fc87d7e1eaa..4e10f9ac852 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -50,18 +50,20 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
+ }
+
GetNextResult getNext() final;
const char* getSourceName() const final {
return kStageName.rawData();
}
- StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
- return constraints;
- }
-
/**
* Attempts to move a subsequent $limit before the skip, potentially allowing for forther
* optimizations earlier in the pipeline.
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 97923ff8118..e32c7d418a6 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -304,7 +304,7 @@ SortOptions DocumentSourceSort::makeSortOptions() const {
opts.limit = limitSrc->getLimit();
opts.maxMemoryUsageBytes = _maxMemoryUsageBytes;
- if (pExpCtx->extSortAllowed && !pExpCtx->inMongos) {
+ if (pExpCtx->allowDiskUse && !pExpCtx->inMongos) {
opts.extSortAllowed = true;
opts.tempDir = pExpCtx->tempDir;
}
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 2494400eaae..c4ecc799a5f 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -63,14 +63,15 @@ public:
}
StageConstraints constraints() const final {
- StageConstraints constraints;
- // Can't swap with a $match if a limit has been absorbed, since in general match can't swap
- // with limit.
+ StageConstraints constraints(
+ _mergingPresorted ? StreamType::kStreaming : StreamType::kBlocking,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ _mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData,
+ FacetRequirement::kAllowed);
+
+ // Can't swap with a $match if a limit has been absorbed, as $match can't swap with $limit.
constraints.canSwapWithMatch = !limitSrc;
-
- // Can run on mongoS only if this stage is merging presorted streams.
- constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS
- : HostTypeRequirement::kAnyShard);
return constraints;
}
@@ -104,6 +105,13 @@ public:
uint64_t maxMemoryUsageBytes = kMaxMemoryUsageBytes);
/**
+ * Returns true if this $sort stage is merging presorted streams.
+ */
+ bool mergingPresorted() const {
+ return _mergingPresorted;
+ }
+
+ /**
* Returns -1 for no limit.
*/
long long getLimit() const;
diff --git a/src/mongo/db/pipeline/document_source_sort_test.cpp b/src/mongo/db/pipeline/document_source_sort_test.cpp
index 25362138a9c..2bc39427a37 100644
--- a/src/mongo/db/pipeline/document_source_sort_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_test.cpp
@@ -402,7 +402,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled)
// Allow the $sort stage to spill to disk.
unittest::TempDir tempDir("DocumentSourceSortTest");
expCtx->tempDir = tempDir.path();
- expCtx->extSortAllowed = true;
+ expCtx->allowDiskUse = true;
const size_t maxMemoryUsageBytes = 1000;
auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
@@ -436,7 +436,7 @@ TEST_F(DocumentSourceSortExecutionTest, ShouldBeAbleToPauseLoadingWhileSpilled)
TEST_F(DocumentSourceSortExecutionTest,
ShouldErrorIfNotAllowedToSpillToDiskAndResultSetIsTooLarge) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
@@ -451,7 +451,7 @@ TEST_F(DocumentSourceSortExecutionTest,
TEST_F(DocumentSourceSortExecutionTest, ShouldCorrectlyTrackMemoryUsageBetweenPauses) {
auto expCtx = getExpCtx();
- expCtx->extSortAllowed = false;
+ expCtx->allowDiskUse = false;
const size_t maxMemoryUsageBytes = 1000;
auto sort = DocumentSourceSort::create(expCtx, BSON("_id" << -1), -1, maxMemoryUsageBytes);
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
index 826967a5c48..e20232457a1 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.h
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -53,6 +53,14 @@ public:
size_t facetId,
const boost::intrusive_ptr<TeeBuffer>& bufferSource);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
+ }
+
GetNextResult getNext() final;
/**
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index 5bc9a91afdb..c95a979dc56 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -47,8 +47,12 @@ public:
GetModPathsReturn getModifiedPaths() const final;
StageConstraints constraints() const final {
- StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 8f15fb5a3fe..db80c5f7234 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -46,7 +46,7 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
: explain(request.getExplain()),
fromMongos(request.isFromMongos()),
needsMerge(request.needsMerge()),
- extSortAllowed(request.shouldAllowDiskUse()),
+ allowDiskUse(request.shouldAllowDiskUse()),
bypassDocumentValidation(request.shouldBypassDocumentValidation()),
from34Mongos(request.isFrom34Mongos()),
ns(request.getNamespaceString()),
@@ -84,7 +84,7 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns,
expCtx->fromMongos = fromMongos;
expCtx->from34Mongos = from34Mongos;
expCtx->inMongos = inMongos;
- expCtx->extSortAllowed = extSortAllowed;
+ expCtx->allowDiskUse = allowDiskUse;
expCtx->bypassDocumentValidation = bypassDocumentValidation;
expCtx->tempDir = tempDir;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 4fe6bc8e541..d580644eeba 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -117,7 +117,7 @@ public:
bool fromMongos = false;
bool needsMerge = false;
bool inMongos = false;
- bool extSortAllowed = false;
+ bool allowDiskUse = false;
bool bypassDocumentValidation = false;
// We track whether the aggregation request came from a 3.4 mongos. If so, the merge may occur
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 34e04912e49..631228a8ddb 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -64,6 +64,9 @@ namespace dps = ::mongo::dotted_path_support;
using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
+using DiskUseRequirement = DocumentSource::StageConstraints::DiskUseRequirement;
+using FacetRequirement = DocumentSource::StageConstraints::FacetRequirement;
+using StreamType = DocumentSource::StageConstraints::StreamType;
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
@@ -169,15 +172,14 @@ void Pipeline::validateFacetPipeline() const {
}
for (auto&& stage : _sources) {
auto stageConstraints = stage->constraints();
- if (!stageConstraints.isAllowedInsideFacetStage) {
+ if (!stageConstraints.isAllowedInsideFacetStage()) {
uasserted(40600,
str::stream() << stage->getSourceName()
<< " is not allowed to be used within a $facet stage");
}
// We expect a stage within a $facet stage to have these properties.
- invariant(stageConstraints.requiresInputDocSource);
- invariant(!stageConstraints.isIndependentOfAnyCollection);
invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
+ invariant(!stageConstraints.isIndependentOfAnyCollection);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -434,8 +436,18 @@ bool Pipeline::needsPrimaryShardMerger() const {
}
bool Pipeline::canRunOnMongos() const {
- return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) {
- return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS;
+ return std::all_of(_sources.begin(), _sources.end(), [&](const auto& stage) {
+ auto constraints = stage->constraints();
+ const bool doesNotNeedShard = (constraints.hostRequirement == HostTypeRequirement::kNone);
+ const bool doesNotNeedDisk =
+ (constraints.diskRequirement == DiskUseRequirement::kNoDiskUse ||
+ (constraints.diskRequirement == DiskUseRequirement::kWritesTmpData &&
+ !pCtx->allowDiskUse));
+ const bool doesNotBlockOrBlockingIsPermitted =
+ (constraints.streamType == StreamType::kStreaming ||
+ !internalQueryProhibitBlockingMergeOnMongoS.load());
+
+ return doesNotNeedShard && doesNotNeedDisk && doesNotBlockOrBlockingIsPermitted;
});
}
@@ -579,11 +591,17 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva
return deps;
}
-boost::intrusive_ptr<DocumentSource> Pipeline::popFrontStageWithName(StringData targetStageName) {
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
+ StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
return nullptr;
}
auto targetStage = _sources.front();
+
+ if (predicate && !predicate(targetStage.get())) {
+ return nullptr;
+ }
+
_sources.pop_front();
stitch();
return targetStage;
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 0aa142c36c8..29321861ce8 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -38,6 +38,8 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/query/query_knobs.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/timer.h"
@@ -281,10 +283,13 @@ public:
}
/**
- * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. Returns
- * nullptr if there is no first stage, or if the stage's name is not 'targetStageName'.
+ * Removes and returns the first stage of the pipeline if its name is 'targetStageName' and the
+ * given 'predicate' function, if present, returns 'true' when called with a pointer to the
+ * stage. Returns nullptr if there is no first stage which meets these criteria.
*/
- boost::intrusive_ptr<DocumentSource> popFrontStageWithName(StringData targetStageName);
+ boost::intrusive_ptr<DocumentSource> popFrontWithCriteria(
+ StringData targetStageName,
+ stdx::function<bool(const DocumentSource* const)> predicate = nullptr);
/**
* PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index e84f30dca74..6c2c3fbba73 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1791,7 +1791,7 @@ public:
DocumentSourceCollectionlessMock() : DocumentSourceMock({}) {}
StageConstraints constraints() const final {
- StageConstraints constraints;
+ auto constraints = DocumentSourceMock::constraints();
constraints.isIndependentOfAnyCollection = true;
return constraints;
}
@@ -1906,7 +1906,12 @@ public:
DocumentSourceDependencyDummy() : DocumentSourceMock({}) {}
StageConstraints constraints() const final {
- return StageConstraints{}; // Overrides DocumentSourceMock's required position.
+ // Overrides DocumentSourceMock's required position.
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed};
}
};
diff --git a/src/mongo/db/query/query_knobs.cpp b/src/mongo/db/query/query_knobs.cpp
index 69936020c51..73f7e618769 100644
--- a/src/mongo/db/query/query_knobs.cpp
+++ b/src/mongo/db/query/query_knobs.cpp
@@ -79,4 +79,6 @@ MONGO_EXPORT_SERVER_PARAMETER(internalDocumentSourceLookupCacheSizeBytes, int, 1
MONGO_EXPORT_SERVER_PARAMETER(internalQueryPlannerGenerateCoveredWholeIndexScans, bool, false);
MONGO_EXPORT_SERVER_PARAMETER(internalQueryIgnoreUnknownJSONSchemaKeywords, bool, false);
+
+MONGO_EXPORT_SERVER_PARAMETER(internalQueryProhibitBlockingMergeOnMongoS, bool, false);
} // namespace mongo
diff --git a/src/mongo/db/query/query_knobs.h b/src/mongo/db/query/query_knobs.h
index 651d39c5c23..5b4d759b26a 100644
--- a/src/mongo/db/query/query_knobs.h
+++ b/src/mongo/db/query/query_knobs.h
@@ -122,4 +122,5 @@ extern AtomicInt32 internalDocumentSourceCursorBatchSizeBytes;
extern AtomicInt32 internalDocumentSourceLookupCacheSizeBytes;
+extern AtomicBool internalQueryProhibitBlockingMergeOnMongoS;
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index f3298662b02..a70227b99a4 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -622,7 +622,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
invariant(pipelineForMerging);
// First, check whether we can merge on the mongoS.
- if (pipelineForMerging->canRunOnMongos() && !internalQueryProhibitMergingOnMongoS.load()) {
+ if (!internalQueryProhibitMergingOnMongoS.load() && pipelineForMerging->canRunOnMongos()) {
// Register the new mongoS cursor, and retrieve the initial batch of results.
auto cursorResponse = establishMergingMongosCursor(opCtx,
request,
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 7f86b1cadab..bccdc369705 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -153,7 +153,13 @@ namespace {
* sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
*/
boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
- if (auto frontSort = mergePipeline->popFrontStageWithName(DocumentSourceSort::kStageName)) {
+ // Remove a leading $sort iff it is a mergesort, since the ARM cannot handle blocking $sort.
+ auto frontSort = mergePipeline->popFrontWithCriteria(
+ DocumentSourceSort::kStageName, [](const DocumentSource* const source) {
+ return static_cast<const DocumentSourceSort* const>(source)->mergingPresorted();
+ });
+
+ if (frontSort) {
auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
if (auto sortLimit = sortStage->getLimitSrc()) {
// There was a limit stage absorbed into the sort stage, so we need to preserve that.
@@ -198,10 +204,10 @@ std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* execu
// instead.
while (!pipeline->getSources().empty()) {
invariant(isSkipOrLimit(pipeline->getSources().front()));
- if (auto skip = pipeline->popFrontStageWithName(DocumentSourceSkip::kStageName)) {
+ if (auto skip = pipeline->popFrontWithCriteria(DocumentSourceSkip::kStageName)) {
root = stdx::make_unique<RouterStageSkip>(
opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
- } else if (auto limit = pipeline->popFrontStageWithName(DocumentSourceLimit::kStageName)) {
+ } else if (auto limit = pipeline->popFrontWithCriteria(DocumentSourceLimit::kStageName)) {
root = stdx::make_unique<RouterStageLimit>(
opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
}
diff --git a/src/mongo/s/query/router_stage_pipeline.h b/src/mongo/s/query/router_stage_pipeline.h
index 329f400dd7a..fef4b106513 100644
--- a/src/mongo/s/query/router_stage_pipeline.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -68,6 +68,14 @@ private:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
std::unique_ptr<RouterExecStage> childStage);
+ StageConstraints constraints() const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kFirst,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed};
+ }
+
GetNextResult getNext() final;
void doDispose() final;
void reattachToOperationContext(OperationContext* opCtx) final;