summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml1
-rw-r--r--jstests/aggregation/mongos_merge.js206
-rw-r--r--jstests/aggregation/shard_targeting.js22
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source.h16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h2
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h2
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h2
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h2
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h2
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h6
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp109
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h81
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h6
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h2
-rw-r--r--src/mongo/db/pipeline/document_source_match.h7
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h8
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h2
-rw-r--r--src/mongo/db/pipeline/document_source_out.h4
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h6
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h6
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h1
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h7
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h4
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h1
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp23
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
-rw-r--r--src/mongo/db/repl/SConscript7
-rw-r--r--src/mongo/s/cluster_cursor_stats.cpp9
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp129
-rw-r--r--src/mongo/s/commands/commands_public.cpp2
-rw-r--r--src/mongo/s/query/SConscript2
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h13
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp22
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp27
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h4
-rw-r--r--src/mongo/s/query/cluster_cursor_cleanup_job.cpp3
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp36
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h37
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp645
-rw-r--r--src/mongo/s/query/cluster_find.cpp22
-rw-r--r--src/mongo/s/query/cluster_query_knobs.cpp1
-rw-r--r--src/mongo/s/query/cluster_query_knobs.h10
-rw-r--r--src/mongo/s/query/router_exec_stage.h50
-rw-r--r--src/mongo/s/query/router_stage_aggregation_merge.cpp78
-rw-r--r--src/mongo/s/query/router_stage_aggregation_merge.h62
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp4
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp37
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp2
-rw-r--r--src/mongo/s/query/router_stage_mock.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp4
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp35
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp6
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp41
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp2
-rw-r--r--src/mongo/s/server.cpp2
71 files changed, 1371 insertions, 514 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml
index 5b214e065c1..78f46d7e866 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml
@@ -8,6 +8,8 @@ selector:
- jstests/aggregation/sources/*/*.js
exclude_files:
- jstests/aggregation/bugs/server18198.js # Uses a mocked mongo client to test read preference.
+ - jstests/aggregation/mongos_merge.js # Cannot specify write concern when
+ # secondaryThrottle is not set.
- jstests/aggregation/mongos_slaveok.js # Majority read on secondary requires afterOpTime.
- jstests/aggregation/shard_targeting.js # Cannot specify write concern when
# secondaryThrottle is not set.
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
index d1127fefcc5..16fcbfe29ad 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
@@ -11,6 +11,7 @@ selector:
- jstests/aggregation/bugs/server6118.js
- jstests/aggregation/bugs/server6179.js
- jstests/aggregation/bugs/server7781.js
+ - jstests/aggregation/mongos_merge.js
- jstests/aggregation/mongos_slaveok.js
- jstests/aggregation/shard_targeting.js
- jstests/aggregation/sources/addFields/use_cases.js
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js
new file mode 100644
index 00000000000..9d7becfb87f
--- /dev/null
+++ b/jstests/aggregation/mongos_merge.js
@@ -0,0 +1,206 @@
+/**
+ * Tests that split aggregations whose merge pipelines are eligible to run on mongoS do so, and
+ * produce the expected results.
+ *
+ * Splittable stages whose merge components are eligible to run on mongoS include:
+ * - $sort (iff merging pre-sorted streams)
+ * - $skip
+ * - $limit
+ * - $sample
+ *
+ * Non-splittable stages such as those listed below are eligible to run in a mongoS merge pipeline:
+ * - $match
+ * - $project
+ * - $addFields
+ * - $unwind
+ * - $redact
+ *
+ * Because wrapping these aggregations in a $facet stage will affect how the pipeline can be merged,
+ * and will therefore invalidate the results of the test cases below, we tag this test to prevent it
+ * running under the 'aggregation_facet_unwind' passthrough.
+ *
+ * @tags: [do_not_wrap_aggregations_in_facets]
+ */
+
+(function() {
+ load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions.
+
+ const st = new ShardingTest({shards: 2, mongos: 1, config: 1});
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ const shard0DB = primaryShardDB = st.shard0.getDB(jsTestName());
+ const shard1DB = st.shard1.getDB(jsTestName());
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable profiling on each shard to verify that no $mergeCursors occur.
+ assert.commandWorked(shard0DB.setProfilingLevel(2));
+ assert.commandWorked(shard1DB.setProfilingLevel(2));
+
+ // Always merge pipelines which cannot merge on mongoS on the primary shard instead, so we know
+ // where to check for $mergeCursors.
+ assert.commandWorked(
+ mongosDB.adminCommand({setParameter: 1, internalQueryAlwaysMergeOnPrimaryShard: true}));
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), "shard0000");
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 4 chunks: [MinKey, -100), [-100, 0), [0, 100), [100, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: -100}}));
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 100}}));
+
+ // Move the [0, 100) and [100, MaxKey) chunks to shard0001.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 50}, to: "shard0001"}));
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 150}, to: "shard0001"}));
+
+ // Write 400 documents across the 4 chunks.
+ for (let i = -200; i < 200; i++) {
+ assert.writeOK(mongosColl.insert({_id: i, a: [i], b: {redactThisDoc: true}, c: true}));
+ }
+
+ /**
+ * Runs the aggregation specified by 'pipeline', verifying that:
+ * - The number of documents returned by the aggregation matches 'expectedCount'.
+ * - The merge was performed on a mongoS if 'mergeOnMongoS' is true, and on a shard otherwise.
+ */
+ function assertMergeBehaviour({testName, pipeline, mergeOnMongoS, expectedCount}) {
+ // Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our
+ // expectation.
+ assert.eq(
+ assert.commandWorked(mongosColl.explain().aggregate(pipeline, {comment: testName}))
+ .mergeOnMongoS,
+ mergeOnMongoS);
+
+ assert.eq(mongosColl.aggregate(pipeline, {comment: testName}).itcount(), expectedCount);
+
+ // Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeOnMongoS' is
+ // false, and that no such aggregation ran if 'mergeOnMongoS' is true.
+ profilerHasNumMatchingEntriesOrThrow({
+ profileDB: primaryShardDB,
+ numExpectedMatches: (mergeOnMongoS ? 0 : 1),
+ filter: {
+ "command.aggregate": mongosColl.getName(),
+ "command.comment": testName,
+ "command.pipeline.$mergeCursors": {$exists: 1}
+ }
+ });
+ }
+
+ /**
+ * Throws an assertion if the aggregation specified by 'pipeline' does not produce
+ * 'expectedCount' results, or if the merge phase is not performed on the mongoS.
+ */
+ function assertMergeOnMongoS({testName, pipeline, expectedCount}) {
+ assertMergeBehaviour({
+ testName: testName,
+ pipeline: pipeline,
+ mergeOnMongoS: true,
+ expectedCount: expectedCount
+ });
+ }
+
+ /**
+ * Throws an assertion if the aggregation specified by 'pipeline' does not produce
+ * 'expectedCount' results, or if the merge phase was not performed on a shard.
+ */
+ function assertMergeOnMongoD({testName, pipeline, expectedCount}) {
+ assertMergeBehaviour({
+ testName: testName,
+ pipeline: pipeline,
+ mergeOnMongoS: false,
+ expectedCount: expectedCount
+ });
+ }
+
+ //
+ // Test cases.
+ //
+
+ let testName;
+
+ // Test that a $match pipeline with an empty merge stage is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_match_only",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}],
+ expectedCount: 400
+ });
+
+ // Test that a $sort stage which merges pre-sorted streams is run on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_sort_presorted",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}],
+ expectedCount: 400
+ });
+
+ // Test that a $sort stage which must sort the dataset from scratch is NOT run on mongoS.
+ assertMergeOnMongoD({
+ testName: "agg_mongos_merge_sort_in_mem",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}],
+ expectedCount: 400
+ });
+
+ // Test that $skip is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_skip",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 100}],
+ expectedCount: 300
+ });
+
+ // Test that $limit is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_limit",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 50}],
+ expectedCount: 50
+ });
+
+ // Test that $sample is merged on mongoS.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_sample",
+ pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 50}}],
+ expectedCount: 50
+ });
+
+ // Test that merge pipelines containing all mongos-runnable stages produce the expected output.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_all_mongos_runnable_stages",
+ pipeline: [
+ {$match: {_id: {$gte: -5, $lte: 100}}},
+ {$sort: {_id: -1}},
+ {$skip: 95},
+ {$limit: 10},
+ {$addFields: {d: true}},
+ {$unwind: "$a"},
+ {$sample: {size: 5}},
+ {$project: {c: 0}},
+ {
+ $redact: {
+ $cond:
+ {if: {$eq: ["$redactThisDoc", true]}, then: "$$PRUNE", else: "$$DESCEND"}
+ }
+ },
+ {
+ $match: {
+ _id: {$gte: -4, $lte: 5},
+ a: {$gte: -4, $lte: 5},
+ b: {$exists: false},
+ c: {$exists: false},
+ d: true
+ }
+ }
+ ],
+ expectedCount: 5
+ });
+})(); \ No newline at end of file
diff --git a/jstests/aggregation/shard_targeting.js b/jstests/aggregation/shard_targeting.js
index 4edec931cde..7b33854f601 100644
--- a/jstests/aggregation/shard_targeting.js
+++ b/jstests/aggregation/shard_targeting.js
@@ -40,10 +40,6 @@
assert.commandWorked(mongosDB.dropDatabase());
- // Always merge on primary shard, so we know where to look for $mergeCursors stages.
- assert.commandWorked(
- mongosDB.adminCommand({setParameter: 1, internalQueryAlwaysMergeOnPrimaryShard: true}));
-
// Enable sharding on the test DB and ensure its primary is shard0000.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), "shard0000");
@@ -79,6 +75,9 @@
ErrorCodes.StaleEpoch
];
+ // Create an $_internalSplitPipeline stage that forces the merge to occur on the Primary shard.
+ const forcePrimaryMerge = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}];
+
function runAggShardTargetTest({splitPoint}) {
// Ensure that both mongoS have up-to-date caches, and enable the profiler on both shards.
assert.commandWorked(mongosForAgg.getDB("admin").runCommand({flushRouterConfig: 1}));
@@ -166,10 +165,13 @@
// Run the same aggregation that targeted a single shard via the now-stale mongoS. It should
// attempt to send the aggregation to shard0000, hit a stale config exception, split the
- // pipeline and redispatch.
+ // pipeline and redispatch. We append an $_internalSplitPipeline stage in order to force a
+ // shard merge rather than a mongoS merge.
testName = "agg_shard_targeting_backout_passthrough_and_split_if_cache_is_stale";
assert.eq(mongosColl
- .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}].concat(splitPoint),
+ .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}]
+ .concat(splitPoint)
+ .concat(forcePrimaryMerge),
{comment: testName})
.itcount(),
2);
@@ -259,10 +261,14 @@
{moveChunk: mongosColl.getFullName(), find: {_id: -50}, to: "shard0000"}));
// Run the same aggregation via the now-stale mongoS. It should split the pipeline, hit a
- // stale config exception, and reset to the original single-shard pipeline upon refresh.
+ // stale config exception, and reset to the original single-shard pipeline upon refresh. We
+ // append an $_internalSplitPipeline stage in order to force a shard merge rather than a
+ // mongoS merge.
testName = "agg_shard_targeting_backout_split_pipeline_and_reassemble_if_cache_is_stale";
assert.eq(mongosColl
- .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}].concat(splitPoint),
+ .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}]
+ .concat(splitPoint)
+ .concat(forcePrimaryMerge),
{comment: testName})
.itcount(),
2);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 4ceb1616a36..1d622a2c8cf 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -242,6 +242,7 @@ docSourceEnv.Library(
'document_source_group.cpp',
'document_source_index_stats.cpp',
'document_source_internal_inhibit_optimization.cpp',
+ 'document_source_internal_split_pipeline.cpp',
'document_source_limit.cpp',
'document_source_match.cpp',
'document_source_merge_cursors.cpp',
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index efb695c8f59..8f1c2fef4a0 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -128,9 +128,18 @@ public:
*/
enum class PositionRequirement { kNone, kFirst, kLast };
+ /**
+ * A HostTypeRequirement defines where this stage is permitted to be executed when the
+ * pipeline is run on a sharded cluster.
+ */
+ enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS };
+
// Set if this stage needs to be in a particular position of the pipeline.
PositionRequirement requiredPosition = PositionRequirement::kNone;
+ // Set if this stage can only be executed on specific components of a sharded cluster.
+ HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard;
+
bool isAllowedInsideFacetStage = true;
// True if this stage does not generate results itself, and instead pulls inputs from an
@@ -147,12 +156,11 @@ public:
// must also override getModifiedPaths() to provide information about which particular
// $match predicates be swapped before itself.
bool canSwapWithMatch = false;
-
- // True if this stage must run on the primary shard when the collection being aggregated is
- // sharded.
- bool mustRunOnPrimaryShardIfSharded = false;
};
+ using HostTypeRequirement = StageConstraints::HostTypeRequirement;
+ using PositionRequirement = StageConstraints::PositionRequirement;
+
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
* (ReturnStatus, Document) pair, with the first entry being used to communicate information
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 40d9fe5555e..64f313649af 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -96,7 +96,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.isAllowedInsideFacetStage = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 6334bbe55c8..2903241d1a7 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -76,7 +76,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index cd86d027733..d5138e74d2a 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -80,7 +80,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
constraints.isIndependentOfAnyCollection = true;
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 57a41996af9..c15c0495ba7 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -53,7 +53,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index aba1f5f115c..e553ea2fa62 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -243,12 +243,12 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints() const {
for (auto&& facet : _facets) {
for (auto&& nestedStage : facet.pipeline->getSources()) {
- if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) {
+ if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) {
// Currently we don't split $facet to have a merger part and a shards part (see
// SERVER-24154). This means that if any stage in any of the $facet pipelines
// requires the primary shard, then the entire $facet must happen on the merger, and
// the merger must be the primary shard.
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
}
}
}
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index 60aefdead64..d8798a5c70e 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -609,7 +609,7 @@ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough {
public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
@@ -633,7 +633,8 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima
facets.emplace_back("needsPrimaryShard", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
+ ASSERT(facetStage->constraints().hostRequirement ==
+ DocumentSource::StageConstraints::HostTypeRequirement::kPrimaryShard);
}
TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) {
@@ -652,7 +653,8 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr
facets.emplace_back("second", std::move(secondPipeline));
auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx);
- ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded);
+ ASSERT(facetStage->constraints().hostRequirement ==
+ DocumentSource::StageConstraints::HostTypeRequirement::kAnyShard);
}
} // namespace
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 5df25d6c1c6..315dcea6dd4 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -49,7 +49,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 489a80ea7b9..53394d3d0ab 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -56,7 +56,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
constraints.canSwapWithMatch = true;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index dd62f5323bd..83546654361 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -71,7 +71,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index a750443a5b5..d1967b6625f 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -52,6 +52,12 @@ public:
return kStageName.rawData();
}
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
GetNextResult getNext() final;
private:
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
new file mode 100644
index 00000000000..a49514cf628
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp
@@ -0,0 +1,109 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_internal_split_pipeline.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(_internalSplitPipeline,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceInternalSplitPipeline::createFromBson);
+
+constexpr StringData DocumentSourceInternalSplitPipeline::kStageName;
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "$_internalSplitPipeline must take a nested object but found: "
+ << elem,
+ elem.type() == BSONType::Object);
+
+ auto specObj = elem.embeddedObject();
+
+ HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard;
+
+ for (auto&& elt : specObj) {
+ if (elt.fieldNameStringData() == "mergeType"_sd) {
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "'mergeType' must be a string value but found: " << elt.type(),
+ elt.type() == BSONType::String);
+
+ auto mergeTypeString = elt.valueStringData();
+
+ if ("anyShard"_sd == mergeTypeString) {
+ mergeType = HostTypeRequirement::kAnyShard;
+ } else if ("primaryShard"_sd == mergeTypeString) {
+ mergeType = HostTypeRequirement::kPrimaryShard;
+ } else if ("mongos"_sd == mergeTypeString) {
+ mergeType = HostTypeRequirement::kAnyShardOrMongoS;
+ } else {
+ uasserted(ErrorCodes::BadValue,
+ str::stream() << "unrecognized field while parsing mergeType: '"
+ << elt.fieldNameStringData()
+ << "'");
+ }
+ } else {
+ uasserted(ErrorCodes::BadValue,
+ str::stream() << "unrecognized field while parsing $_internalSplitPipeline: '"
+ << elt.fieldNameStringData()
+ << "'");
+ }
+ }
+
+ return new DocumentSourceInternalSplitPipeline(expCtx, mergeType);
+}
+
+DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::getNext() {
+ pExpCtx->checkForInterrupt();
+ return pSource->getNext();
+}
+
+Value DocumentSourceInternalSplitPipeline::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ std::string mergeTypeString;
+
+ switch (_mergeType) {
+ case HostTypeRequirement::kAnyShardOrMongoS:
+ mergeTypeString = "mongos";
+ break;
+
+ case HostTypeRequirement::kPrimaryShard:
+ mergeTypeString = "primaryShard";
+ break;
+
+ default:
+ mergeTypeString = "anyShard";
+ break;
+ }
+
+ return Value(Document{{getSourceName(), Value{Document{{"mergeType", mergeTypeString}}}}});
+}
+
+} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
new file mode 100644
index 00000000000..c18a6d301a6
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * An internal stage available for testing. Acts as a simple passthrough of intermediate results
+ * from the source stage, but forces the pipeline to split at the point where this stage appears
+ * (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be
+ * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this
+ * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the
+ * pipeline will be sent to a random participating shard, subject to the requirements of any
+ * subsequent splittable stages in the pipeline.
+ */
+class DocumentSourceInternalSplitPipeline final : public DocumentSource,
+ public SplittableDocumentSource {
+public:
+ static constexpr StringData kStageName = "$_internalSplitPipeline"_sd;
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement, const boost::intrusive_ptr<ExpressionContext>&);
+
+ DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ HostTypeRequirement mergeType)
+ : DocumentSource(expCtx), _mergeType(mergeType) {}
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return this;
+ }
+
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ }
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = _mergeType;
+ return constraints;
+ }
+
+ GetNextResult getNext() final;
+
+private:
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard;
+};
+
+} // namesace mongo
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index c6660152d66..88acfa8b45a 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -42,6 +42,12 @@ public:
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
}
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
/**
* Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately.
*/
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d5bab6fd9e7..a4cd8d35ed6 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -97,7 +97,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
constraints.canSwapWithMatch = true;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 2be99dd7f12..135dbc39e17 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -49,6 +49,13 @@ public:
}
const char* getSourceName() const override;
+
+ StageConstraints constraints() const override {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
Value serialize(
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 93cd6dd93e7..d53c2183443 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -176,6 +176,12 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() {
return std::move(next);
}
+bool DocumentSourceMergeCursors::remotesExhausted() const {
+ return std::all_of(_cursors.begin(), _cursors.end(), [](const auto& cursorAndConn) {
+ return cursorAndConn->cursor.isDead();
+ });
+}
+
void DocumentSourceMergeCursors::doDispose() {
for (auto&& cursorAndConn : _cursors) {
// Note it is an error to call done() on a connection before consuming the reply from a
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index 045ddad4836..e83316fa6ea 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -55,7 +55,8 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
@@ -68,6 +69,11 @@ public:
std::vector<CursorDescriptor> cursorDescriptors,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ /**
+ * Returns true if all remotes have reported that their cursors are closed.
+ */
+ bool remotesExhausted() const;
+
/** Returns non-owning pointers to cursors managed by this stage.
* Call this instead of getNext() if you want access to the raw streams.
* This method should only be called at most once.
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 513994cd599..22f7a1aa24d 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -51,7 +51,7 @@ public:
StageConstraints constraints() const override {
StageConstraints constraints;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 9366d01a670..6756cd4df2a 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -46,9 +46,9 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.mustRunOnPrimaryShardIfSharded = true;
+ constraints.hostRequirement = HostTypeRequirement::kPrimaryShard;
constraints.isAllowedInsideFacetStage = false;
- constraints.requiredPosition = StageConstraints::PositionRequirement::kLast;
+ constraints.requiredPosition = PositionRequirement::kLast;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index a76ca9c7940..bedc434d61e 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -40,6 +40,12 @@ public:
const char* getSourceName() const final;
boost::intrusive_ptr<DocumentSource> optimize() final;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
/**
* Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact
* stage.
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index 5117e5c516a..c174d95d935 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -123,6 +123,11 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() {
intrusive_ptr<DocumentSource> DocumentSourceSample::getMergeSource() {
// Just need to merge the pre-sorted documents by their random values.
- return DocumentSourceSort::create(pExpCtx, randSortSpec, _size);
+ BSONObjBuilder randMergeSortSpec;
+
+ randMergeSortSpec.appendElements(randSortSpec);
+ randMergeSortSpec.append("$mergePresorted", true);
+
+ return DocumentSourceSort::create(pExpCtx, randMergeSortSpec.obj(), _size);
}
} // mongo
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index ec88c0737a8..662c6a9a49d 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -39,6 +39,12 @@ public:
const char* getSourceName() const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
GetDepsReturn getDependencies(DepsTracker* deps) const final {
return SEE_NEXT;
}
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index b50065bc303..188e9864310 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -103,6 +103,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index 46cb6a6f3ab..a69f5e59eb5 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -37,6 +37,13 @@ public:
// virtuals from DocumentSource
GetNextResult getNext() final;
const char* getSourceName() const final;
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ return constraints;
+ }
+
/**
* Attempts to move a subsequent $limit before the skip, potentially allowing for forther
* optimizations earlier in the pipeline.
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 866a3fb955c..5731051cbd3 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -57,6 +57,10 @@ public:
// Can't swap with a $match if a limit has been absorbed, since in general match can't swap
// with limit.
constraints.canSwapWithMatch = !limitSrc;
+
+ // Can run on mongoS only if this stage is merging presorted streams.
+ constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS
+ : HostTypeRequirement::kAnyShard);
return constraints;
}
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index fb7bd68ba39..5bc9a91afdb 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -48,6 +48,7 @@ public:
StageConstraints constraints() const final {
StageConstraints constraints;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
constraints.canSwapWithMatch = true;
return constraints;
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index e5cd62a4670..8b84fe36bff 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -62,6 +62,9 @@ using std::vector;
namespace dps = ::mongo::dotted_path_support;
+using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement;
+using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement;
+
Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {}
Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx)
@@ -171,8 +174,7 @@ Status Pipeline::validateFacetPipeline() const {
// We expect a stage within a $facet stage to have these properties.
invariant(stageConstraints.requiresInputDocSource);
invariant(!stageConstraints.isIndependentOfAnyCollection);
- invariant(stageConstraints.requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kNone);
+ invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
}
// Facet pipelines cannot have any stages which are initial sources. We've already validated the
@@ -184,9 +186,7 @@ Status Pipeline::validateFacetPipeline() const {
Status Pipeline::ensureAllStagesAreInLegalPositions() const {
size_t i = 0;
for (auto&& stage : _sources) {
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kFirst &&
- i != 0) {
+ if (stage->constraints().requiredPosition == PositionRequirement::kFirst && i != 0) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
<< " is only valid as the first stage in a pipeline.",
@@ -199,8 +199,7 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const {
17313};
}
- if (stage->constraints().requiredPosition ==
- DocumentSource::StageConstraints::PositionRequirement::kLast &&
+ if (stage->constraints().requiredPosition == PositionRequirement::kLast &&
i != _sources.size() - 1) {
return {ErrorCodes::BadValue,
str::stream() << stage->getSourceName()
@@ -312,6 +311,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() {
shardPipeline->_splitForSharded = true;
_splitForMerge = true;
+ stitch();
+
return shardPipeline;
}
@@ -428,7 +429,13 @@ BSONObj Pipeline::getInitialQuery() const {
bool Pipeline::needsPrimaryShardMerger() const {
return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) {
- return stage->constraints().mustRunOnPrimaryShardIfSharded;
+ return stage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard;
+ });
+}
+
+bool Pipeline::canRunOnMongos() const {
+ return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) {
+ return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS;
});
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 677cfe7b6d1..ed19d44ae2b 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -219,6 +219,11 @@ public:
bool needsPrimaryShardMerger() const;
/**
+ * Returns whether or not every DocumentSource in the pipeline can run on mongoS.
+ */
+ bool canRunOnMongos() const;
+
+ /**
* Modifies the pipeline, optimizing it by combining and swapping stages.
*/
void optimizePipeline();
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 0c841ed5e45..2653b607ab4 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -778,6 +778,8 @@ env.CppUnitTest(
LIBDEPS=[
'repl_coordinator_impl',
'replmocks',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -791,6 +793,8 @@ env.CppUnitTest(
'repl_coordinator_impl',
'replica_set_messages',
'replmocks',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -819,6 +823,8 @@ env.CppUnitTest(
'repl_coordinator_impl',
'replica_set_messages',
'replmocks',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
],
)
@@ -1396,6 +1402,7 @@ env.CppUnitTest(
'sync_source_selector_mock',
'task_executor_mock',
'$BUILD_DIR/mongo/db/query/command_request_response',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
'$BUILD_DIR/mongo/unittest/concurrency',
],
diff --git a/src/mongo/s/cluster_cursor_stats.cpp b/src/mongo/s/cluster_cursor_stats.cpp
index d0620705052..4aa5ca97891 100644
--- a/src/mongo/s/cluster_cursor_stats.cpp
+++ b/src/mongo/s/cluster_cursor_stats.cpp
@@ -50,11 +50,12 @@ public:
{
BSONObjBuilder openBob(cursorBob.subobjStart("open"));
auto stats = grid.getCursorManager()->stats();
- openBob.append("multiTarget", static_cast<long long>(stats.cursorsSharded));
- openBob.append("singleTarget", static_cast<long long>(stats.cursorsNotSharded));
+ openBob.append("multiTarget", static_cast<long long>(stats.cursorsMultiTarget));
+ openBob.append("singleTarget", static_cast<long long>(stats.cursorsSingleTarget));
openBob.append("pinned", static_cast<long long>(stats.cursorsPinned));
- openBob.append("total",
- static_cast<long long>(stats.cursorsSharded + stats.cursorsNotSharded));
+ openBob.append(
+ "total",
+ static_cast<long long>(stats.cursorsMultiTarget + stats.cursorsSingleTarget));
openBob.doneFast();
}
cursorBob.done();
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index cf6b5991642..3b3e87129ab 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -34,6 +34,7 @@
#include <boost/intrusive_ptr.hpp>
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
@@ -42,6 +43,8 @@
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/db/views/view.h"
#include "mongo/executor/task_executor_pool.h"
@@ -51,6 +54,9 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
+#include "mongo/s/query/cluster_client_cursor_impl.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/store_possible_cursor.h"
@@ -86,15 +92,15 @@ Status appendExplainResults(
const std::vector<AsyncRequestsSender::Response>& shardResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMergingShard,
+ const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging,
BSONObjBuilder* result) {
if (pipelineForTargetedShards->isSplitForSharded()) {
- *result << "needsPrimaryShardMerger" << pipelineForMergingShard->needsPrimaryShardMerger()
- << "splitPipeline"
- << Document{{"shardsPart",
- pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)},
- {"mergerPart",
- pipelineForMergingShard->writeExplainOps(*mergeCtx->explain)}};
+ *result << "needsPrimaryShardMerger" << pipelineForMerging->needsPrimaryShardMerger()
+ << "mergeOnMongoS" << pipelineForMerging->canRunOnMongos() << "splitPipeline"
+ << Document{
+ {"shardsPart",
+ pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)},
+ {"mergerPart", pipelineForMerging->writeExplainOps(*mergeCtx->explain)}};
} else {
*result << "splitPipeline" << BSONNULL;
}
@@ -207,10 +213,10 @@ BSONObj createCommandForMergingShard(
const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMergingShard) {
+ const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
- mergeCmd["pipeline"] = Value(pipelineForMergingShard->serialize());
+ mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
mergeCmd[AggregationRequest::kFromRouterName] = Value(true);
mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
@@ -284,7 +290,7 @@ StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardC
return swCursors;
}
-StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergeCursor(
+StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor(
OperationContext* opCtx,
const NamespaceString& nss,
const std::vector<ClusterClientCursorParams::RemoteCursor>& cursors,
@@ -307,6 +313,73 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergeCursor(
return {{std::move(mergingShardId), std::move(shardCmdResponse)}};
}
+BSONObj establishMergingMongosCursor(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const NamespaceString& requestedNss,
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging,
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
+
+ ClusterClientCursorParams params(
+ requestedNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ ReadPreferenceSetting::get(opCtx));
+
+ params.mergePipeline = std::move(pipelineForMerging);
+ params.remotes = std::move(cursors);
+ params.batchSize = request.getBatchSize();
+
+ auto ccc = ClusterClientCursorImpl::make(
+ opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+
+ auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+ BSONObjBuilder cursorResponse;
+
+ CursorResponseBuilder responseBuilder(true, &cursorResponse);
+
+ ccc->reattachToOperationContext(opCtx);
+
+ for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
+ auto next = uassertStatusOK(ccc->next());
+
+ // Check whether we have exhausted the pipeline's results.
+ if (next.isEOF()) {
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
+ }
+
+ // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
+ // to be returned on the next getMore.
+ auto nextObj = *next.getResult();
+
+ if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
+ ccc->queueResult(nextObj);
+ break;
+ }
+
+ responseBuilder.append(nextObj);
+ }
+
+ ccc->detachFromOperationContext();
+
+ CursorId clusterCursorId = 0;
+
+ if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
+ clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
+ opCtx,
+ ccc.releaseCursor(),
+ requestedNss,
+ ClusterCursorManager::CursorType::MultiTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+ }
+
+ responseBuilder.done(clusterCursorId, requestedNss.ns());
+
+ Command::appendCommandStatus(cursorResponse, Status::OK());
+
+ return cursorResponse.obj();
+}
+
} // namespace
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
@@ -394,7 +467,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const auto shardQuery = pipeline->getInitialQuery();
auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard;
+ std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
int numAttempts = 0;
@@ -428,10 +501,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// If we have to run on multiple shards and the pipeline is not yet split, split it. If we
// can run on a single shard and the pipeline is already split, reassemble it.
if (needsSplit && !isSplit) {
- pipelineForMergingShard = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMergingShard->splitForSharded();
+ pipelineForMerging = std::move(pipelineForTargetedShards);
+ pipelineForTargetedShards = pipelineForMerging->splitForSharded();
} else if (!needsSplit && isSplit) {
- pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMergingShard));
+ pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
}
// Generate the command object for the targeted shards.
@@ -479,7 +552,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return appendExplainResults(std::move(shardResults),
mergeCtx,
pipelineForTargetedShards,
- pipelineForMergingShard,
+ pipelineForMerging,
result);
}
@@ -505,15 +578,31 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If we reach here, we have a merge pipeline to dispatch.
- invariant(pipelineForMergingShard);
+ invariant(pipelineForMerging);
- pipelineForMergingShard->addInitialSource(
+ // We need a DocumentSourceMergeCursors regardless of whether we merge on mongoS or on a shard.
+ pipelineForMerging->addInitialSource(
DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx));
- auto mergeCmdObj =
- createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMergingShard);
+ // First, check whether we can merge on the mongoS.
+ if (pipelineForMerging->canRunOnMongos() && !internalQueryProhibitMergingOnMongoS.load()) {
+ // Register the new mongoS cursor, and retrieve the initial batch of results.
+ auto cursorResponse = establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ std::move(pipelineForMerging),
+ std::move(cursors));
+
+ // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
+ // can never run on mongoS. Filter the command response and return immediately.
+ Command::filterCommandReplyForPassthrough(cursorResponse, result);
+ return getStatusFromCommandResult(result->asTempObj());
+ }
+
+ // If we cannot merge on mongoS, establish the merge cursor on a shard.
+ auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMerging);
- auto mergeResponse = uassertStatusOK(establishMergeCursor(
+ auto mergeResponse = uassertStatusOK(establishMergingShardCursor(
opCtx,
namespaces.executionNss,
cursors,
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 5a8f3cb5a90..df9223536fa 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -1548,7 +1548,7 @@ public:
}
return Status(ErrorCodes::Unauthorized,
- str::stream() << "Not authorized to create users on db: " << dbname);
+ str::stream() << "Not authorized to list collections on db: " << dbname);
}
bool supportsWriteConcern(const BSONObj& cmd) const override {
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index f2a7b25cb28..2977050be19 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -32,6 +32,7 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
+ "router_stage_aggregation_merge.cpp",
"router_stage_limit.cpp",
"router_stage_merge.cpp",
"router_stage_mock.cpp",
@@ -69,6 +70,7 @@ env.Library(
"$BUILD_DIR/mongo/executor/task_executor_interface",
"$BUILD_DIR/mongo/s/async_requests_sender",
"$BUILD_DIR/mongo/s/client/sharding_client",
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
"$BUILD_DIR/mongo/s/coreshard",
],
)
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 5ac983f260b..aca0f54c704 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -66,7 +66,7 @@ public:
*
* A non-ok status is returned in case of any error.
*/
- virtual StatusWith<ClusterQueryResult> next(OperationContext* opCtx) = 0;
+ virtual StatusWith<ClusterQueryResult> next() = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has
@@ -77,6 +77,17 @@ public:
virtual void kill(OperationContext* opCtx) = 0;
/**
+ * Sets the operation context for the cursor. Must be called before the first call to next().
+ * The cursor attaches to a new OperationContext on each getMore.
+ */
+ virtual void reattachToOperationContext(OperationContext* opCtx) = 0;
+
+ /**
+ * Detaches the cursor from its current OperationContext.
+ */
+ virtual void detachFromOperationContext() = 0;
+
+ /**
* Returns whether or not this cursor is tailing a capped collection on a shard.
*/
virtual bool isTailable() const = 0;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index a6a0c4ccdb3..0e6b5a197ce 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -32,6 +32,7 @@
#include "mongo/s/query/cluster_client_cursor_impl.h"
+#include "mongo/s/query/router_stage_aggregation_merge.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
#include "mongo/s/query/router_stage_mock.h"
@@ -77,7 +78,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock
boost::optional<LogicalSessionId> lsid)
: _params(std::move(params)), _root(std::move(root)), _lsid(lsid) {}
-StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
// First return stashed results, if there are any.
if (!_stash.empty()) {
auto front = std::move(_stash.front());
@@ -86,7 +87,7 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(OperationContext* o
return {front};
}
- auto next = _root->next(opCtx);
+ auto next = _root->next();
if (next.isOK() && !next.getValue().isEOF()) {
++_numReturnedSoFar;
}
@@ -97,6 +98,14 @@ void ClusterClientCursorImpl::kill(OperationContext* opCtx) {
_root->kill(opCtx);
}
+void ClusterClientCursorImpl::reattachToOperationContext(OperationContext* opCtx) {
+ _root->reattachToOperationContext(opCtx);
+}
+
+void ClusterClientCursorImpl::detachFromOperationContext() {
+ _root->detachFromOperationContext();
+}
+
bool ClusterClientCursorImpl::isTailable() const {
return _params.isTailable;
}
@@ -136,7 +145,14 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
const auto limit = params->limit;
const bool hasSort = !params->sort.isEmpty();
- // The first stage is always the one which merges from the remotes.
+ // The first stage always merges from the remotes. If 'mergePipeline' has been specified in
+ // ClusterClientCursorParams, then RouterStageAggregationMerge should be the root and only node.
+ // Otherwise, construct a RouterStage pipeline from the remotes, skip, limit, and sort fields in
+ // 'params'.
+ if (params->mergePipeline) {
+ return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline));
+ }
+
std::unique_ptr<RouterExecStage> root = stdx::make_unique<RouterStageMerge>(executor, params);
if (skip) {
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index a13c18bfcb3..07dbab2094d 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -89,10 +89,14 @@ public:
executor::TaskExecutor* executor,
ClusterClientCursorParams&& params);
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
+ void reattachToOperationContext(OperationContext* opCtx) final;
+
+ void detachFromOperationContext() final;
+
bool isTailable() const final;
UserNameIterator getAuthenticatedUsers() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 549a692d720..8598fe192dc 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -40,9 +40,10 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
-// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
-// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+// Note: Though the next() method on RouterExecStage and its subclasses depend on an
+// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
+// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
+// OperationContext, so we omit the call to rettachToOperationContext in these tests.
TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
auto mockStage = stdx::make_unique<RouterStageMock>();
@@ -57,13 +58,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 0);
for (int i = 1; i < 10; ++i) {
- auto result = cursor.next(nullptr);
+ auto result = cursor.next();
ASSERT(result.isOK());
ASSERT_BSONOBJ_EQ(*result.getValue().getResult(), BSON("a" << i));
ASSERT_EQ(cursor.getNumReturnedSoFar(), i);
}
// Now check that if nothing is fetched the getNumReturnedSoFar stays the same.
- auto result = cursor.next(nullptr);
+ auto result = cursor.next();
ASSERT_OK(result.getStatus());
ASSERT_TRUE(result.getValue().isEOF());
ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL);
@@ -78,7 +79,7 @@ TEST(ClusterClientCursorImpl, QueueResult) {
ClusterClientCursorParams(NamespaceString("unused"), {}),
boost::none);
- auto firstResult = cursor.next(nullptr);
+ auto firstResult = cursor.next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
@@ -86,22 +87,22 @@ TEST(ClusterClientCursorImpl, QueueResult) {
cursor.queueResult(BSON("a" << 2));
cursor.queueResult(BSON("a" << 3));
- auto secondResult = cursor.next(nullptr);
+ auto secondResult = cursor.next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
- auto thirdResult = cursor.next(nullptr);
+ auto thirdResult = cursor.next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3));
- auto fourthResult = cursor.next(nullptr);
+ auto fourthResult = cursor.next();
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 4));
- auto fifthResult = cursor.next(nullptr);
+ auto fifthResult = cursor.next();
ASSERT_OK(fifthResult.getStatus());
ASSERT(fifthResult.getValue().isEOF());
@@ -119,19 +120,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
boost::none);
ASSERT_TRUE(cursor.remotesExhausted());
- auto firstResult = cursor.next(nullptr);
+ auto firstResult = cursor.next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
ASSERT_TRUE(cursor.remotesExhausted());
- auto secondResult = cursor.next(nullptr);
+ auto secondResult = cursor.next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(cursor.remotesExhausted());
- auto thirdResult = cursor.next(nullptr);
+ auto thirdResult = cursor.next();
ASSERT_OK(thirdResult.getStatus());
ASSERT_TRUE(thirdResult.getValue().isEOF());
ASSERT_TRUE(cursor.remotesExhausted());
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index b6c7d9d0a81..8cc1771ec1c 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -44,7 +44,7 @@ ClusterClientCursorMock::~ClusterClientCursorMock() {
invariant(_exhausted || _killed);
}
-StatusWith<ClusterQueryResult> ClusterClientCursorMock::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> ClusterClientCursorMock::next() {
invariant(!_killed);
if (_resultsQueue.empty()) {
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 81c97af72f1..004a635eaed 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -47,10 +47,14 @@ public:
~ClusterClientCursorMock();
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
+ void reattachToOperationContext(OperationContext* opCtx) final {}
+
+ void detachFromOperationContext() final {}
+
bool isTailable() const final;
UserNameIterator getAuthenticatedUsers() const final;
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 7507ad82ae8..1b4d76124c3 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -37,6 +37,7 @@
#include "mongo/db/auth/user_name.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/s/client/shard.h"
#include "mongo/util/net/hostandport.h"
@@ -110,6 +111,9 @@ struct ClusterClientCursorParams {
// Should be forwarded to the remote hosts in 'cmdObj'.
boost::optional<long long> limit;
+ // If set, we use this pipeline to merge the output of aggregations on each remote.
+ std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline;
+
// Whether this cursor is tailing a capped collection.
bool isTailable = false;
diff --git a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp
index 57685653cf2..dcf19acf775 100644
--- a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp
+++ b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp
@@ -73,11 +73,12 @@ void ClusterCursorCleanupJob::run() {
// Mirroring the behavior in CursorManager::timeoutCursors(), a negative value for
// cursorTimeoutMillis has the same effect as a 0 value: cursors are cleaned immediately.
auto cursorTimeoutValue = cursorTimeoutMillis.load();
+ const auto opCtx = client->makeOperationContext();
Date_t cutoff = (cursorTimeoutValue > 0)
? (Date_t::now() - Milliseconds(cursorTimeoutValue))
: Date_t::now();
manager->killMortalCursorsInactiveSince(cutoff);
- manager->incrementCursorsTimedOut(manager->reapZombieCursors());
+ manager->incrementCursorsTimedOut(manager->reapZombieCursors(opCtx.get()));
MONGO_IDLE_THREAD_BLOCK;
sleepsecs(clientCursorMonitorFrequencySecs.load());
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 5e6a5dbebf4..99e7b2afa82 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -110,9 +110,19 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator
return *this;
}
-StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next() {
invariant(_cursor);
- return _cursor->next(opCtx);
+ return _cursor->next();
+}
+
+void ClusterCursorManager::PinnedCursor::reattachToOperationContext(OperationContext* opCtx) {
+ invariant(_cursor);
+ _cursor->reattachToOperationContext(opCtx);
+}
+
+void ClusterCursorManager::PinnedCursor::detachFromOperationContext() {
+ invariant(_cursor);
+ _cursor->detachFromOperationContext();
}
bool ClusterCursorManager::PinnedCursor::isTailable() const {
@@ -182,13 +192,13 @@ ClusterCursorManager::~ClusterCursorManager() {
invariant(_namespaceToContainerMap.empty());
}
-void ClusterCursorManager::shutdown() {
+void ClusterCursorManager::shutdown(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_inShutdown = true;
lk.unlock();
killAllCursors();
- reapZombieCursors();
+ reapZombieCursors(opCtx);
}
StatusWith<CursorId> ClusterCursorManager::registerCursor(
@@ -356,7 +366,7 @@ void ClusterCursorManager::killAllCursors() {
}
}
-std::size_t ClusterCursorManager::reapZombieCursors() {
+std::size_t ClusterCursorManager::reapZombieCursors(OperationContext* opCtx) {
struct CursorDescriptor {
CursorDescriptor(NamespaceString ns, CursorId cursorId, bool isInactive)
: ns(std::move(ns)), cursorId(cursorId), isInactive(isInactive) {}
@@ -395,11 +405,9 @@ std::size_t ClusterCursorManager::reapZombieCursors() {
}
lk.unlock();
- // Pass a null OperationContext, because this call should not actually schedule any remote
- // work: the cursor is already pending kill, meaning the killCursors commands are already
- // being scheduled to be sent to the remote shard hosts. This method will just wait for them
- // all to be scheduled.
- zombieCursor.getValue()->kill(nullptr);
+ // Pass opCtx to kill(), since a cursor which wraps an underlying aggregation pipeline is
+ // obliged to call Pipeline::dispose with a valid OperationContext prior to deletion.
+ zombieCursor.getValue()->kill(opCtx);
zombieCursor.getValue().reset();
lk.lock();
@@ -430,11 +438,11 @@ ClusterCursorManager::Stats ClusterCursorManager::stats() const {
}
switch (entry.getCursorType()) {
- case CursorType::NamespaceNotSharded:
- ++stats.cursorsNotSharded;
+ case CursorType::SingleTarget:
+ ++stats.cursorsSingleTarget;
break;
- case CursorType::NamespaceSharded:
- ++stats.cursorsSharded;
+ case CursorType::MultiTarget:
+ ++stats.cursorsMultiTarget;
break;
}
}
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 86bfcb31bbe..f6da640bb1e 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -76,11 +76,11 @@ public:
//
enum class CursorType {
- // Represents a cursor operating on an unsharded namespace.
- NamespaceNotSharded,
+ // Represents a cursor retrieving data from a single remote source.
+ SingleTarget,
- // Represents a cursor operating on a sharded namespace.
- NamespaceSharded,
+ // Represents a cursor retrieving data from multiple remote sources.
+ MultiTarget,
};
enum class CursorLifetime {
@@ -100,11 +100,11 @@ public:
};
struct Stats {
- // Count of open cursors registered with CursorType::NamespaceSharded.
- size_t cursorsSharded = 0;
+ // Count of open cursors registered with CursorType::MultiTarget.
+ size_t cursorsMultiTarget = 0;
- // Count of open cursors registered with CursorType::NamespaceNotSharded.
- size_t cursorsNotSharded = 0;
+ // Count of open cursors registered with CursorType::SingleTarget.
+ size_t cursorsSingleTarget = 0;
// Count of pinned cursors.
size_t cursorsPinned = 0;
@@ -154,7 +154,18 @@ public:
*
* Can block.
*/
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx);
+ StatusWith<ClusterQueryResult> next();
+
+ /**
+ * Sets the operation context for the cursor. Must be called before the first call to
+ * next().
+ */
+ void reattachToOperationContext(OperationContext* opCtx);
+
+ /**
+ * Detaches the cursor from its current OperationContext.
+ */
+ void detachFromOperationContext();
/**
* Returns whether or not the underlying cursor is tailing a capped collection. Cannot be
@@ -252,7 +263,7 @@ public:
* Kills and reaps all cursors currently owned by this cursor manager, and puts the manager
* into the shutting down state where it will not accept any new cursors for registration.
*/
- void shutdown();
+ void shutdown(OperationContext* opCtx);
/**
* Registers the given cursor with this manager, and returns the registered cursor's id, or
@@ -322,13 +333,13 @@ public:
* as 'kill pending'. Returns the number of cursors that were marked as inactive.
*
* If no other non-const methods are called simultaneously, it is guaranteed that this method
- * will delete all non-pinned cursors marked as 'kill pending'. Otherwise, no such guarantee is
+ * will delete all non-pinned cursors marked as 'kill pending'. Otherwise, no such guarantee is
* made (this is due to the fact that the blocking kill for each cursor is performed outside of
* the cursor manager lock).
*
* Can block.
*/
- std::size_t reapZombieCursors();
+ std::size_t reapZombieCursors(OperationContext* opCtx);
/**
* Returns the number of open cursors on a ClusterCursorManager, broken down by type.
@@ -496,7 +507,7 @@ private:
std::unique_ptr<ClusterClientCursor> _cursor;
bool _killPending = false;
bool _isInactive = false;
- CursorType _cursorType = CursorType::NamespaceNotSharded;
+ CursorType _cursorType = CursorType::SingleTarget;
CursorLifetime _cursorLifetime = CursorLifetime::Mortal;
Date_t _lastActive;
boost::optional<LogicalSessionId> _lsid;
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index 46377d2f483..ab46f50e537 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -91,7 +91,7 @@ protected:
private:
void tearDown() final {
_manager.killAllCursors();
- _manager.reapZombieCursors();
+ _manager.reapZombieCursors(nullptr);
}
// List of flags representing whether our allocated cursors have been killed yet. The value of
@@ -110,31 +110,31 @@ private:
TEST_F(ClusterCursorManagerTest, RegisterCursor) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << 1));
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- std::move(cursor),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ std::move(cursor),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- auto nextResult = pinnedCursor.getValue().next(nullptr);
+ auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
ASSERT(nextResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult());
- nextResult = pinnedCursor.getValue().next(nullptr);
+ nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
ASSERT_TRUE(nextResult.getValue().isEOF());
}
// Test that registering a cursor returns a non-zero cursor id.
TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_NE(0, cursorId);
}
@@ -142,20 +142,20 @@ TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) {
TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << 1));
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- std::move(cursor),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ std::move(cursor),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId());
- auto nextResult = checkedOutCursor.getValue().next(nullptr);
+ auto nextResult = checkedOutCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
ASSERT(nextResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult());
- nextResult = checkedOutCursor.getValue().next(nullptr);
+ nextResult = checkedOutCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
ASSERT_TRUE(nextResult.getValue().isEOF());
}
@@ -168,21 +168,21 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
for (int i = 0; i < numCursors; ++i) {
auto cursor = allocateMockCursor();
cursor->queueResult(BSON("a" << i));
- cursorIds[i] = assertGet(
- getManager()->registerCursor(nullptr,
- std::move(cursor),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ cursorIds[i] =
+ assertGet(getManager()->registerCursor(nullptr,
+ std::move(cursor),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
}
for (int i = 0; i < numCursors; ++i) {
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- auto nextResult = pinnedCursor.getValue().next(nullptr);
+ auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
ASSERT(nextResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(BSON("a" << i), *nextResult.getValue().getResult());
- nextResult = pinnedCursor.getValue().next(nullptr);
+ nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
ASSERT_TRUE(nextResult.getValue().isEOF());
}
@@ -190,12 +190,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
// Test that checking out a pinned cursor returns an error with code ErrorCodes::CursorInUse.
TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_EQ(ErrorCodes::CursorInUse,
@@ -204,12 +204,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
// Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound.
TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT_EQ(ErrorCodes::CursorNotFound,
getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
@@ -226,12 +226,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUnknown) {
TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
const NamespaceString correctNamespace("test.correct");
const NamespaceString incorrectNamespace("test.incorrect");
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- correctNamespace,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ correctNamespace,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_EQ(ErrorCodes::CursorNotFound,
getManager()->checkOutCursor(incorrectNamespace, cursorId, nullptr).getStatus());
}
@@ -239,12 +239,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
// Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound,
// even if there is an existing cursor with the same namespace but a different cursor id.
TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_EQ(ErrorCodes::CursorNotFound,
getManager()->checkOutCursor(nss, cursorId + 1, nullptr).getStatus());
}
@@ -252,12 +252,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
// Test that checking out a cursor updates the 'last active' time associated with the cursor to the
// current time.
TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
Date_t cursorRegistrationTime = getClockSource()->now();
getClockSource()->advance(Milliseconds(1));
auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
@@ -265,19 +265,19 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
getManager()->killMortalCursorsInactiveSince(cursorRegistrationTime);
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
// Test that checking in a cursor updates the 'last active' time associated with the cursor to the
// current time.
TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
Date_t cursorCheckOutTime = getClockSource()->now();
auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
@@ -285,23 +285,23 @@ TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) {
checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
getManager()->killMortalCursorsInactiveSince(cursorCheckOutTime);
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
// Test that killing a pinned cursor by id successfully kills the cursor.
TEST_F(ClusterCursorManagerTest, KillCursorBasic) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId()));
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
@@ -312,18 +312,18 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) {
std::vector<CursorId> cursorIds(numCursors);
// Register cursors and populate 'cursorIds' with the returned cursor ids.
for (size_t i = 0; i < numCursors; ++i) {
- cursorIds[i] = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ cursorIds[i] =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
}
// Kill each cursor and verify that it was successfully killed.
for (size_t i = 0; i < numCursors; ++i) {
ASSERT_OK(getManager()->killCursor(nss, cursorIds[i]));
ASSERT(!isMockCursorKilled(i));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(i));
}
}
@@ -339,12 +339,12 @@ TEST_F(ClusterCursorManagerTest, KillCursorUnknown) {
TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) {
const NamespaceString correctNamespace("test.correct");
const NamespaceString incorrectNamespace("test.incorrect");
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- correctNamespace,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ correctNamespace,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
Status killResult = getManager()->killCursor(incorrectNamespace, cursorId);
ASSERT_EQ(ErrorCodes::CursorNotFound, killResult);
}
@@ -352,12 +352,12 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) {
// Test that killing an unknown cursor returns an error with code ErrorCodes::CursorNotFound,
// even if there is an existing cursor with the same namespace but a different cursor id.
TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
Status killResult = getManager()->killCursor(nss, cursorId + 1);
ASSERT_EQ(ErrorCodes::CursorNotFound, killResult);
}
@@ -367,11 +367,11 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
getManager()->killMortalCursorsInactiveSince(getClockSource()->now());
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
@@ -382,11 +382,11 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
getManager()->killMortalCursorsInactiveSince(timeBeforeCursorCreation);
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
@@ -395,32 +395,32 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Immortal));
getManager()->killMortalCursorsInactiveSince(getClockSource()->now());
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
// Test that killing all mortal expired cursors does not kill a mortal expired cursor that is
// pinned.
TEST_F(ClusterCursorManagerTest, ShouldNotKillPinnedCursors) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pin = assertGet(getManager()->checkOutCursor(nss, cursorId, nullptr));
getManager()->killMortalCursorsInactiveSince(getClockSource()->now());
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
pin.returnCursor(ClusterCursorManager::CursorState::NotExhausted);
getManager()->killMortalCursorsInactiveSince(getClockSource()->now());
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
@@ -434,19 +434,18 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
if (i < numKilledCursorsExpected) {
cutoff = getClockSource()->now();
}
- ASSERT_OK(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ ASSERT_OK(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
getClockSource()->advance(Milliseconds(1));
}
getManager()->killMortalCursorsInactiveSince(cutoff);
for (size_t i = 0; i < numCursors; ++i) {
ASSERT(!isMockCursorKilled(i));
}
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
for (size_t i = 0; i < numCursors; ++i) {
if (i < numKilledCursorsExpected) {
ASSERT(isMockCursorKilled(i));
@@ -460,18 +459,17 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors)
TEST_F(ClusterCursorManagerTest, KillAllCursors) {
const size_t numCursors = 10;
for (size_t i = 0; i < numCursors; ++i) {
- ASSERT_OK(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ ASSERT_OK(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
}
getManager()->killAllCursors();
for (size_t i = 0; i < numCursors; ++i) {
ASSERT(!isMockCursorKilled(i));
}
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
for (size_t i = 0; i < numCursors; ++i) {
ASSERT(isMockCursorKilled(i));
}
@@ -480,30 +478,30 @@ TEST_F(ClusterCursorManagerTest, KillAllCursors) {
// Test that reaping correctly calls kill() on the underlying ClusterClientCursor for a killed
// cursor.
TEST_F(ClusterCursorManagerTest, ReapZombieCursorsBasic) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
// Test that reaping does not call kill() on the underlying ClusterClientCursor for a killed cursor
// that is still pinned.
TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
@@ -513,17 +511,17 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipNonZombies) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
// Test that a new ClusterCursorManager's stats() is initially zero for the cursor counts.
TEST_F(ClusterCursorManagerTest, StatsInitAsZero) {
- ASSERT_EQ(0U, getManager()->stats().cursorsSharded);
- ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded);
+ ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget);
+ ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
}
@@ -532,9 +530,9 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceSharded,
+ ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal));
- ASSERT_EQ(1U, getManager()->stats().cursorsSharded);
+ ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget);
}
// Test that registering a not-sharded cursor updates the corresponding counter in stats().
@@ -542,9 +540,9 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
- ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded);
+ ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget);
}
// Test that checking out a cursor updates the pinned counter in stats().
@@ -553,7 +551,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
assertGet(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceSharded,
+ ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
@@ -567,21 +565,20 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceSharded,
+ ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal));
- ASSERT_EQ(i + 1, getManager()->stats().cursorsSharded);
- ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded);
+ ASSERT_EQ(i + 1, getManager()->stats().cursorsMultiTarget);
+ ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget);
}
const size_t numNotShardedCursors = 10;
for (size_t i = 0; i < numNotShardedCursors; ++i) {
- ASSERT_OK(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
- ASSERT_EQ(numShardedCursors, getManager()->stats().cursorsSharded);
- ASSERT_EQ(i + 1, getManager()->stats().cursorsNotSharded);
+ ASSERT_OK(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+ ASSERT_EQ(numShardedCursors, getManager()->stats().cursorsMultiTarget);
+ ASSERT_EQ(i + 1, getManager()->stats().cursorsSingleTarget);
}
}
@@ -591,24 +588,24 @@ TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) {
assertGet(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceSharded,
+ ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal));
- ASSERT_EQ(1U, getManager()->stats().cursorsSharded);
+ ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget);
ASSERT_OK(getManager()->killCursor(nss, cursorId));
- ASSERT_EQ(0U, getManager()->stats().cursorsSharded);
+ ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget);
}
// Test that killing a not-sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
- ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded);
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+ ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget);
ASSERT_OK(getManager()->killCursor(nss, cursorId));
- ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded);
+ ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget);
}
// Test that killing a pinned cursor decrements the corresponding counter in stats().
@@ -617,7 +614,7 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
assertGet(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceSharded,
+ ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
@@ -631,44 +628,44 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
assertGet(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceSharded,
+ ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
- ASSERT_EQ(1U, getManager()->stats().cursorsSharded);
+ ASSERT_OK(pinnedCursor.getValue().next().getStatus());
+ ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
- ASSERT_EQ(0U, getManager()->stats().cursorsSharded);
+ ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget);
}
// Test that exhausting a not-sharded cursor decrements the corresponding counter in stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
- ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded);
+ ASSERT_OK(pinnedCursor.getValue().next().getStatus());
+ ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
- ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded);
+ ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget);
}
// Test that checking a pinned cursor in as exhausted decrements the corresponding counter in
// stats().
TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
+ ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
@@ -677,15 +674,15 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
// Test that checking a pinned cursor in as *not* exhausted decrements the corresponding counter in
// stats().
TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus());
+ ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
@@ -693,12 +690,12 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
// Test that getting the namespace for a cursor returns the correct namespace.
TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdBasic) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
boost::optional<NamespaceString> cursorNamespace =
getManager()->getNamespaceForCursorId(cursorId);
ASSERT(cursorNamespace);
@@ -711,12 +708,12 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsSameNames
const size_t numCursors = 10;
std::vector<CursorId> cursorIds(numCursors);
for (size_t i = 0; i < numCursors; ++i) {
- cursorIds[i] = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ cursorIds[i] =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
}
for (size_t i = 0; i < numCursors; ++i) {
boost::optional<NamespaceString> cursorNamespace =
@@ -733,12 +730,12 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsDifferent
std::vector<std::pair<NamespaceString, CursorId>> cursors(numCursors);
for (size_t i = 0; i < numCursors; ++i) {
NamespaceString cursorNamespace(std::string(str::stream() << "test.collection" << i));
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- cursorNamespace,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ cursorNamespace,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
cursors[i] = {cursorNamespace, cursorId};
}
for (size_t i = 0; i < numCursors; ++i) {
@@ -764,12 +761,12 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDefaultConstructor) {
// Test that returning a pinned cursor correctly unpins the cursor, and leaves the pin owning no
// cursor.
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
@@ -783,17 +780,17 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
// Test that returning a pinned cursor with 'Exhausted' correctly de-registers and destroys the
// cursor, and leaves the pin owning no cursor.
TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
- ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus());
+ ASSERT_OK(registeredCursor.getValue().next().getStatus());
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
@@ -802,7 +799,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
// reapZombieCursors() and check that the cursor still has not been killed.
ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
}
@@ -815,56 +812,56 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
// The mock should indicate that is has open remote cursors.
mockCursor->markRemotesNotExhausted();
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- std::move(mockCursor),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ std::move(mockCursor),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
- ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus());
+ ASSERT_OK(registeredCursor.getValue().next().getStatus());
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
// Cursor should be kill pending, so it will be killed during reaping.
ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
// Test that the PinnedCursor move assignment operator correctly kills the cursor if it has not yet
// been returned.
TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
pinnedCursor = ClusterCursorManager::PinnedCursor();
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
// Test that the PinnedCursor destructor correctly kills the cursor if it has not yet been returned.
TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) {
{
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
}
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
@@ -874,12 +871,12 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
mockCursor->markRemotesNotExhausted();
ASSERT_FALSE(mockCursor->remotesExhausted());
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- std::move(mockCursor),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ std::move(mockCursor),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted());
@@ -887,25 +884,25 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
// Test that killed cursors which are still pinned are not reaped.
TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT(!isMockCursorKilled(0));
// Pinned cursor should remain alive after reaping.
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(!isMockCursorKilled(0));
// The cursor can be reaped once it is returned to the manager.
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT(!isMockCursorKilled(0));
- getManager()->reapZombieCursors();
+ getManager()->reapZombieCursors(nullptr);
ASSERT(isMockCursorKilled(0));
}
@@ -913,33 +910,32 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT(!isMockCursorKilled(0));
- getManager()->shutdown();
+ getManager()->shutdown(nullptr);
ASSERT(isMockCursorKilled(0));
- ASSERT_EQUALS(
- ErrorCodes::ShutdownInProgress,
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress,
+ getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
}
TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT(!isMockCursorKilled(0));
- getManager()->shutdown();
+ getManager()->shutdown(nullptr);
ASSERT(isMockCursorKilled(0));
@@ -955,7 +951,7 @@ TEST_F(ClusterCursorManagerTest, CursorsWithoutSessions) {
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
// Manager should have no active sessions.
@@ -970,12 +966,12 @@ TEST_F(ClusterCursorManagerTest, CursorsWithoutSessions) {
TEST_F(ClusterCursorManagerTest, OneCursorWithASession) {
// Add a cursor with a session to the cursor manager.
auto lsid = makeLogicalSessionIdForTest();
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
// Retrieve all sessions active in manager - set should contain just lsid.
LogicalSessionIdSet lsids;
@@ -1004,12 +1000,12 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithASession) {
TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) {
// Add a cursor with a session to the cursor manager.
auto lsid = makeLogicalSessionIdForTest();
- auto cursorId = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
// Check the cursor out, then try to append cursors, see that we get one.
auto res = getManager()->checkOutCursor(nss, cursorId, nullptr);
@@ -1025,18 +1021,18 @@ TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) {
TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameSession) {
// Add two cursors on the same session to the cursor manager.
auto lsid = makeLogicalSessionIdForTest();
- auto cursorId1 = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
- auto cursorId2 = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId1 =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+ auto cursorId2 =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
// Retrieve all sessions - set should contain just lsid.
stdx::unordered_set<LogicalSessionId, LogicalSessionIdHash> lsids;
@@ -1073,24 +1069,24 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleSessions) {
auto lsid2 = makeLogicalSessionIdForTest();
// Register two cursors with different lsids, and one without.
- CursorId cursor1 = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid1),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
-
- CursorId cursor2 = assertGet(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid2),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ CursorId cursor1 =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid1),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+
+ CursorId cursor2 =
+ assertGet(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid2),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_OK(getManager()->registerCursor(nullptr,
allocateMockCursor(),
nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal));
// Retrieve all sessions - should be both lsids.
@@ -1117,12 +1113,11 @@ TEST_F(ClusterCursorManagerTest, ManyCursorsManySessions) {
const int count = 10000;
for (int i = 0; i < count; i++) {
auto lsid = makeLogicalSessionIdForTest();
- ASSERT_OK(
- getManager()->registerCursor(nullptr,
- allocateMockCursor(lsid),
- nss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
- ClusterCursorManager::CursorLifetime::Mortal));
+ ASSERT_OK(getManager()->registerCursor(nullptr,
+ allocateMockCursor(lsid),
+ nss,
+ ClusterCursorManager::CursorType::SingleTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
}
// Retrieve all sessions.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 0f7630d5dc8..b7a5ecc6b34 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -257,6 +257,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
return swCursors.getStatus();
}
+ // Determine whether the cursor we may eventually register will be single- or multi-target.
+
+ const auto cursorType = swCursors.getValue().size() > 1
+ ? ClusterCursorManager::CursorType::MultiTarget
+ : ClusterCursorManager::CursorType::SingleTarget;
+
// Transfer the established cursors to a ClusterClientCursor.
params.remotes = std::move(swCursors.getValue());
@@ -267,8 +273,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
int bytesBuffered = 0;
+
+ ccc->reattachToOperationContext(opCtx);
+
while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) {
- auto next = ccc->next(opCtx);
+ auto next = ccc->next();
if (!next.isOK()) {
return next.getStatus();
@@ -300,6 +309,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
results->push_back(std::move(nextObj));
}
+ ccc->detachFromOperationContext();
+
if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) {
cursorState = ClusterCursorManager::CursorState::Exhausted;
}
@@ -313,8 +324,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
// Register the cursor with the cursor manager for subsequent getMore's.
auto cursorManager = Grid::get(opCtx)->getCursorManager();
- const auto cursorType = chunkManager ? ClusterCursorManager::CursorType::NamespaceSharded
- : ClusterCursorManager::CursorType::NamespaceNotSharded;
const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout()
? ClusterCursorManager::CursorLifetime::Immortal
: ClusterCursorManager::CursorLifetime::Mortal;
@@ -427,8 +436,11 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
long long batchSize = request.batchSize.value_or(0);
long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar();
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+
+ pinnedCursor.getValue().reattachToOperationContext(opCtx);
+
while (!FindCommon::enoughForGetMore(batchSize, batch.size())) {
- auto next = pinnedCursor.getValue().next(opCtx);
+ auto next = pinnedCursor.getValue().next();
if (!next.isOK()) {
return next.getStatus();
}
@@ -454,6 +466,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
batch.push_back(std::move(*next.getValue().getResult()));
}
+ pinnedCursor.getValue().detachFromOperationContext();
+
// Transfer ownership of the cursor back to the cursor manager.
pinnedCursor.getValue().returnCursor(cursorState);
diff --git a/src/mongo/s/query/cluster_query_knobs.cpp b/src/mongo/s/query/cluster_query_knobs.cpp
index 76e82b4f914..79ef4737760 100644
--- a/src/mongo/s/query/cluster_query_knobs.cpp
+++ b/src/mongo/s/query/cluster_query_knobs.cpp
@@ -35,5 +35,6 @@
namespace mongo {
MONGO_EXPORT_SERVER_PARAMETER(internalQueryAlwaysMergeOnPrimaryShard, bool, false);
+MONGO_EXPORT_SERVER_PARAMETER(internalQueryProhibitMergingOnMongoS, bool, false);
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_query_knobs.h b/src/mongo/s/query/cluster_query_knobs.h
index 6eaf31dd102..d75670822a7 100644
--- a/src/mongo/s/query/cluster_query_knobs.h
+++ b/src/mongo/s/query/cluster_query_knobs.h
@@ -34,7 +34,15 @@ namespace mongo {
// If set to true on mongos, all aggregations delivered to the mongos which require a merging shard
// will select the primary shard as the merger. False by default, which means that the merging shard
-// will be selected randomly amongst the shards participating in the query.
+// will be selected randomly amongst the shards participating in the query. Pipelines capable of
+// merging on mongoS are unaffected by this setting, unless internalQueryProhibitMergingOnMongoS is
+// true.
extern AtomicBool internalQueryAlwaysMergeOnPrimaryShard;
+// If set to true on mongos, all aggregations which could otherwise merge on the mongos will be
+// obliged to merge on a shard instead. Pipelines which are redirected to the shards will obey the
+// value of internalQueryAlwaysMergeOnPrimaryShard. False by default, meaning that pipelines capable
+// of merging on mongoS will always do so.
+extern AtomicBool internalQueryProhibitMergingOnMongoS;
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index f6128a53e43..1f2fd2a9e7f 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -66,7 +66,7 @@ public:
* holding on to a subset of the returned results and need to minimize memory usage, call copy()
* on the BSONObjs.
*/
- virtual StatusWith<ClusterQueryResult> next(OperationContext* opCtx) = 0;
+ virtual StatusWith<ClusterQueryResult> next() = 0;
/**
* Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for
@@ -88,16 +88,64 @@ public:
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+ /**
+ * Sets the current operation context to be used by the router stage.
+ */
+ void reattachToOperationContext(OperationContext* opCtx) {
+ invariant(!_opCtx);
+ _opCtx = opCtx;
+
+ if (_child) {
+ _child->reattachToOperationContext(opCtx);
+ }
+
+ doReattachToOperationContext();
+ }
+
+ /**
+ * Discards the stage's current OperationContext, setting it to 'nullptr'.
+ */
+ void detachFromOperationContext() {
+ invariant(_opCtx);
+ _opCtx = nullptr;
+
+ if (_child) {
+ _child->detachFromOperationContext();
+ }
+
+ doDetachFromOperationContext();
+ }
+
protected:
/**
+ * Performs any stage-specific reattach actions. Called after the OperationContext has been set
+ * and is available via getOpCtx().
+ */
+ virtual void doReattachToOperationContext() {}
+
+ /**
+ * Performs any stage-specific detach actions. Called after the OperationContext has been set to
+ * nullptr.
+ */
+ virtual void doDetachFromOperationContext() {}
+
+ /**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
*/
RouterExecStage* getChildStage() {
return _child.get();
}
+ /**
+ * Returns a pointer to the current OperationContext, or nullptr if there is no context.
+ */
+ OperationContext* getOpCtx() {
+ return _opCtx;
+ }
+
private:
std::unique_ptr<RouterExecStage> _child;
+ OperationContext* _opCtx = nullptr;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp
new file mode 100644
index 00000000000..6fdfdc4fea2
--- /dev/null
+++ b/src/mongo/s/query/router_stage_aggregation_merge.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/router_stage_aggregation_merge.h"
+
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/db/pipeline/expression_context.h"
+
+namespace mongo {
+
+RouterStageAggregationMerge::RouterStageAggregationMerge(
+ std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
+ : _mergePipeline(std::move(mergePipeline)) {}
+
+StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() {
+ // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
+ if (auto result = _mergePipeline->getNext()) {
+ return {result->toBson()};
+ }
+
+ // If we reach this point, we have hit EOF.
+ _mergePipeline.get_deleter().dismissDisposal();
+ _mergePipeline->dispose(getOpCtx());
+
+ return {ClusterQueryResult()};
+}
+
+void RouterStageAggregationMerge::doReattachToOperationContext() {
+ _mergePipeline->reattachToOperationContext(getOpCtx());
+}
+
+void RouterStageAggregationMerge::doDetachFromOperationContext() {
+ _mergePipeline->detachFromOperationContext();
+}
+
+void RouterStageAggregationMerge::kill(OperationContext* opCtx) {
+ _mergePipeline.get_deleter().dismissDisposal();
+ _mergePipeline->dispose(opCtx);
+}
+
+bool RouterStageAggregationMerge::remotesExhausted() {
+ const auto mergeSource =
+ static_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get());
+ return mergeSource->remotesExhausted();
+}
+
+Status RouterStageAggregationMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.h b/src/mongo/s/query/router_stage_aggregation_merge.h
new file mode 100644
index 00000000000..363b46e73d9
--- /dev/null
+++ b/src/mongo/s/query/router_stage_aggregation_merge.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/router_exec_stage.h"
+
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+
+/**
+ * Draws results from a Pipeline with a DocumentSourceMergeCursors at its head, which is the
+ * underlying source of the stream of merged documents manipulated by the RouterStage pipeline.
+ */
+class RouterStageAggregationMerge final : public RouterExecStage {
+public:
+ RouterStageAggregationMerge(std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline);
+
+ StatusWith<ClusterQueryResult> next() final;
+
+ void kill(OperationContext* opCtx) final;
+
+ bool remotesExhausted() final;
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
+protected:
+ void doReattachToOperationContext() final;
+
+ void doDetachFromOperationContext() final;
+
+private:
+ std::unique_ptr<Pipeline, Pipeline::Deleter> _mergePipeline;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index ea90251eef6..03711279e07 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -39,12 +39,12 @@ RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long
invariant(limit > 0);
}
-StatusWith<ClusterQueryResult> RouterStageLimit::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> RouterStageLimit::next() {
if (_returnedSoFar >= _limit) {
return {ClusterQueryResult()};
}
- auto childResult = getChildStage()->next(opCtx);
+ auto childResult = getChildStage()->next();
if (!childResult.isOK()) {
return childResult;
}
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 42223902cc1..cb2fd708835 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -39,7 +39,7 @@ class RouterStageLimit final : public RouterExecStage {
public:
RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit);
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index f866249cdd1..61689e4cd6a 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -40,9 +40,10 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
-// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
-// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+// Note: Though the next() method on RouterExecStage and its subclasses depend on an
+// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
+// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
+// OperationContext, so we omit the call to rettachToOperationContext in these tests.
TEST(RouterStageLimitTest, LimitIsOne) {
auto mockStage = stdx::make_unique<RouterStageMock>();
@@ -52,17 +53,17 @@ TEST(RouterStageLimitTest, LimitIsOne) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1);
- auto firstResult = limitStage->next(nullptr);
+ auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next(nullptr);
+ auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(!secondResult.getValue().getResult());
// Once end-of-stream is reached, the limit stage should keep returning no results.
- auto thirdResult = limitStage->next(nullptr);
+ auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(!thirdResult.getValue().getResult());
}
@@ -75,17 +76,17 @@ TEST(RouterStageLimitTest, LimitIsTwo) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2);
- auto firstResult = limitStage->next(nullptr);
+ auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next(nullptr);
+ auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
- auto thirdResult = limitStage->next(nullptr);
+ auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(!thirdResult.getValue().getResult());
}
@@ -99,12 +100,12 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3);
- auto firstResult = limitStage->next(nullptr);
+ auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next(nullptr);
+ auto secondResult = limitStage->next();
ASSERT_NOT_OK(secondResult.getStatus());
ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
@@ -122,21 +123,21 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2);
- auto firstResult = limitStage->next(nullptr);
+ auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
- auto secondResult = limitStage->next(nullptr);
+ auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().isEOF());
- auto thirdResult = limitStage->next(nullptr);
+ auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2));
- auto fourthResult = limitStage->next(nullptr);
+ auto fourthResult = limitStage->next();
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().isEOF());
}
@@ -150,19 +151,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
ASSERT_TRUE(limitStage->remotesExhausted());
- auto firstResult = limitStage->next(nullptr);
+ auto firstResult = limitStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
ASSERT_TRUE(limitStage->remotesExhausted());
- auto secondResult = limitStage->next(nullptr);
+ auto secondResult = limitStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(limitStage->remotesExhausted());
- auto thirdResult = limitStage->next(nullptr);
+ auto thirdResult = limitStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(limitStage->remotesExhausted());
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 90a80e7161b..72fe7a06624 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -40,9 +40,9 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor,
ClusterClientCursorParams* params)
: _executor(executor), _arm(executor, params) {}
-StatusWith<ClusterQueryResult> RouterStageMerge::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> RouterStageMerge::next() {
while (!_arm.ready()) {
- auto nextEventStatus = _arm.nextEvent(opCtx);
+ auto nextEventStatus = _arm.nextEvent(getOpCtx());
if (!nextEventStatus.isOK()) {
return nextEventStatus.getStatus();
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 428a405b401..caae43877c6 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -45,7 +45,7 @@ class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams* params);
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index e134340713a..edeb1f9945c 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -50,7 +50,7 @@ void RouterStageMock::markRemotesExhausted() {
_remotesExhausted = true;
}
-StatusWith<ClusterQueryResult> RouterStageMock::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> RouterStageMock::next() {
if (_resultsQueue.empty()) {
return {ClusterQueryResult()};
}
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index 7cba32a81f6..18baaeacd74 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -44,7 +44,7 @@ class RouterStageMock final : public RouterExecStage {
public:
~RouterStageMock() final {}
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 9cb1e4d26c9..fecf5440898 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -41,8 +41,8 @@ namespace mongo {
RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child)
: RouterExecStage(std::move(child)) {}
-StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next(OperationContext* opCtx) {
- auto childResult = getChildStage()->next(opCtx);
+StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
+ auto childResult = getChildStage()->next();
if (!childResult.isOK() || !childResult.getValue().getResult()) {
return childResult;
}
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index e3599a3e9b0..c2329bbc93d 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -41,7 +41,7 @@ class RouterStageRemoveSortKey final : public RouterExecStage {
public:
RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child);
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index e9f338b9e5f..5db61b7b0a9 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -40,9 +40,10 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
-// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
-// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+// Note: Though the next() method on RouterExecStage and its subclasses depend on an
+// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
+// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
+// OperationContext, so we omit the call to rettachToOperationContext in these tests.
TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto mockStage = stdx::make_unique<RouterStageMock>();
@@ -54,29 +55,29 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
- auto firstResult = sortKeyStage->next(nullptr);
+ auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3));
- auto secondResult = sortKeyStage->next(nullptr);
+ auto secondResult = sortKeyStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(),
BSON("c" << BSON("d"
<< "foo")));
- auto thirdResult = sortKeyStage->next(nullptr);
+ auto thirdResult = sortKeyStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3));
- auto fourthResult = sortKeyStage->next(nullptr);
+ auto fourthResult = sortKeyStage->next();
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj());
- auto fifthResult = sortKeyStage->next(nullptr);
+ auto fifthResult = sortKeyStage->next();
ASSERT_OK(fifthResult.getStatus());
ASSERT(fifthResult.getValue().isEOF());
}
@@ -88,12 +89,12 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
- auto firstResult = sortKeyStage->next(nullptr);
+ auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSONObj());
- auto secondResult = sortKeyStage->next(nullptr);
+ auto secondResult = sortKeyStage->next();
ASSERT_NOT_OK(secondResult.getStatus());
ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
@@ -107,21 +108,21 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
- auto firstResult = sortKeyStage->next(nullptr);
+ auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1));
- auto secondResult = sortKeyStage->next(nullptr);
+ auto secondResult = sortKeyStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().isEOF());
- auto thirdResult = sortKeyStage->next(nullptr);
+ auto thirdResult = sortKeyStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2));
- auto fourthResult = sortKeyStage->next(nullptr);
+ auto fourthResult = sortKeyStage->next();
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().isEOF());
}
@@ -135,19 +136,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
- auto firstResult = sortKeyStage->next(nullptr);
+ auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
- auto secondResult = sortKeyStage->next(nullptr);
+ auto secondResult = sortKeyStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2));
ASSERT_TRUE(sortKeyStage->remotesExhausted());
- auto thirdResult = sortKeyStage->next(nullptr);
+ auto thirdResult = sortKeyStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(sortKeyStage->remotesExhausted());
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 6763ca5808b..7510e3aefd6 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -39,9 +39,9 @@ RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long lo
invariant(skip > 0);
}
-StatusWith<ClusterQueryResult> RouterStageSkip::next(OperationContext* opCtx) {
+StatusWith<ClusterQueryResult> RouterStageSkip::next() {
while (_skippedSoFar < _skip) {
- auto next = getChildStage()->next(opCtx);
+ auto next = getChildStage()->next();
if (!next.isOK()) {
return next;
}
@@ -53,7 +53,7 @@ StatusWith<ClusterQueryResult> RouterStageSkip::next(OperationContext* opCtx) {
++_skippedSoFar;
}
- return getChildStage()->next(opCtx);
+ return getChildStage()->next();
}
void RouterStageSkip::kill(OperationContext* opCtx) {
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 773220d4fe6..c6dc1adda39 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -39,7 +39,7 @@ class RouterStageSkip final : public RouterExecStage {
public:
RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip);
- StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final;
+ StatusWith<ClusterQueryResult> next() final;
void kill(OperationContext* opCtx) final;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index 79099661a52..f1a58371b5c 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -40,9 +40,10 @@ namespace mongo {
namespace {
-// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*,
-// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use
-// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests.
+// Note: Though the next() method on RouterExecStage and its subclasses depend on an
+// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are
+// mocked in this test using RouterStageMock. RouterStageMock does not actually use the
+// OperationContext, so we omit the call to rettachToOperationContext in these tests.
TEST(RouterStageSkipTest, SkipIsOne) {
auto mockStage = stdx::make_unique<RouterStageMock>();
@@ -52,22 +53,22 @@ TEST(RouterStageSkipTest, SkipIsOne) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2));
- auto secondResult = skipStage->next(nullptr);
+ auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
// Once end-of-stream is reached, the skip stage should keep returning boost::none.
- auto thirdResult = skipStage->next(nullptr);
+ auto thirdResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
- auto fourthResult = skipStage->next(nullptr);
+ auto fourthResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
}
@@ -81,12 +82,12 @@ TEST(RouterStageSkipTest, SkipIsThree) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4));
- auto secondResult = skipStage->next(nullptr);
+ auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().isEOF());
}
@@ -99,7 +100,7 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().isEOF());
}
@@ -112,7 +113,7 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 100);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().isEOF());
}
@@ -126,7 +127,7 @@ TEST(RouterStageSkipTest, ErrorWhileSkippingResults) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_NOT_OK(firstResult.getStatus());
ASSERT_EQ(firstResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(firstResult.getStatus().reason(), "bad thing happened");
@@ -141,12 +142,12 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 3));
- auto secondResult = skipStage->next(nullptr);
+ auto secondResult = skipStage->next();
ASSERT_NOT_OK(secondResult.getStatus());
ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue);
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
@@ -162,16 +163,16 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2);
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().isEOF());
- auto secondResult = skipStage->next(nullptr);
+ auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
- auto thirdResult = skipStage->next(nullptr);
+ auto thirdResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
}
@@ -186,19 +187,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1);
ASSERT_TRUE(skipStage->remotesExhausted());
- auto firstResult = skipStage->next(nullptr);
+ auto firstResult = skipStage->next();
ASSERT_OK(firstResult.getStatus());
ASSERT(firstResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2));
ASSERT_TRUE(skipStage->remotesExhausted());
- auto secondResult = skipStage->next(nullptr);
+ auto secondResult = skipStage->next();
ASSERT_OK(secondResult.getStatus());
ASSERT(secondResult.getValue().getResult());
ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3));
ASSERT_TRUE(skipStage->remotesExhausted());
- auto thirdResult = skipStage->next(nullptr);
+ auto thirdResult = skipStage->next();
ASSERT_OK(thirdResult.getStatus());
ASSERT(thirdResult.getValue().isEOF());
ASSERT_TRUE(skipStage->remotesExhausted());
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 1fd5b146741..a2f912971d4 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -74,7 +74,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
cursorManager->registerCursor(opCtx,
ccc.releaseCursor(),
requestedNss,
- ClusterCursorManager::CursorType::NamespaceNotSharded,
+ ClusterCursorManager::CursorType::SingleTarget,
ClusterCursorManager::CursorLifetime::Mortal);
if (!clusterCursorId.isOK()) {
return clusterCursorId.getStatus();
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 760e313c5d0..521702e56b8 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -206,7 +206,7 @@ static void cleanupTask() {
}
if (auto cursorManager = Grid::get(opCtx)->getCursorManager()) {
- cursorManager->shutdown();
+ cursorManager->shutdown(opCtx);
}
if (auto pool = Grid::get(opCtx)->getExecutorPool()) {