diff options
71 files changed, 1371 insertions, 514 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml index 5b214e065c1..78f46d7e866 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml @@ -8,6 +8,8 @@ selector: - jstests/aggregation/sources/*/*.js exclude_files: - jstests/aggregation/bugs/server18198.js # Uses a mocked mongo client to test read preference. + - jstests/aggregation/mongos_merge.js # Cannot specify write concern when + # secondaryThrottle is not set. - jstests/aggregation/mongos_slaveok.js # Majority read on secondary requires afterOpTime. - jstests/aggregation/shard_targeting.js # Cannot specify write concern when # secondaryThrottle is not set. diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index d1127fefcc5..16fcbfe29ad 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -11,6 +11,7 @@ selector: - jstests/aggregation/bugs/server6118.js - jstests/aggregation/bugs/server6179.js - jstests/aggregation/bugs/server7781.js + - jstests/aggregation/mongos_merge.js - jstests/aggregation/mongos_slaveok.js - jstests/aggregation/shard_targeting.js - jstests/aggregation/sources/addFields/use_cases.js diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js new file mode 100644 index 00000000000..9d7becfb87f --- /dev/null +++ b/jstests/aggregation/mongos_merge.js @@ -0,0 +1,206 @@ +/** + * Tests that split aggregations whose merge pipelines are eligible to run on mongoS do so, and + * produce the expected results. + * + * Splittable stages whose merge components are eligible to run on mongoS include: + * - $sort (iff merging pre-sorted streams) + * - $skip + * - $limit + * - $sample + * + * Non-splittable stages such as those listed below are eligible to run in a mongoS merge pipeline: + * - $match + * - $project + * - $addFields + * - $unwind + * - $redact + * + * Because wrapping these aggregations in a $facet stage will affect how the pipeline can be merged, + * and will therefore invalidate the results of the test cases below, we tag this test to prevent it + * running under the 'aggregation_facet_unwind' passthrough. + * + * @tags: [do_not_wrap_aggregations_in_facets] + */ + +(function() { + load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions. + + const st = new ShardingTest({shards: 2, mongos: 1, config: 1}); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + const shard0DB = primaryShardDB = st.shard0.getDB(jsTestName()); + const shard1DB = st.shard1.getDB(jsTestName()); + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable profiling on each shard to verify that no $mergeCursors occur. + assert.commandWorked(shard0DB.setProfilingLevel(2)); + assert.commandWorked(shard1DB.setProfilingLevel(2)); + + // Always merge pipelines which cannot merge on mongoS on the primary shard instead, so we know + // where to check for $mergeCursors. + assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, internalQueryAlwaysMergeOnPrimaryShard: true})); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), "shard0000"); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 4 chunks: [MinKey, -100), [-100, 0), [0, 100), [100, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: -100}})); + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 100}})); + + // Move the [0, 100) and [100, MaxKey) chunks to shard0001. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 50}, to: "shard0001"})); + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 150}, to: "shard0001"})); + + // Write 400 documents across the 4 chunks. + for (let i = -200; i < 200; i++) { + assert.writeOK(mongosColl.insert({_id: i, a: [i], b: {redactThisDoc: true}, c: true})); + } + + /** + * Runs the aggregation specified by 'pipeline', verifying that: + * - The number of documents returned by the aggregation matches 'expectedCount'. + * - The merge was performed on a mongoS if 'mergeOnMongoS' is true, and on a shard otherwise. + */ + function assertMergeBehaviour({testName, pipeline, mergeOnMongoS, expectedCount}) { + // Verify that the 'mergeOnMongoS' explain() output for this pipeline matches our + // expectation. + assert.eq( + assert.commandWorked(mongosColl.explain().aggregate(pipeline, {comment: testName})) + .mergeOnMongoS, + mergeOnMongoS); + + assert.eq(mongosColl.aggregate(pipeline, {comment: testName}).itcount(), expectedCount); + + // Verify that a $mergeCursors aggregation ran on the primary shard if 'mergeOnMongoS' is + // false, and that no such aggregation ran if 'mergeOnMongoS' is true. + profilerHasNumMatchingEntriesOrThrow({ + profileDB: primaryShardDB, + numExpectedMatches: (mergeOnMongoS ? 0 : 1), + filter: { + "command.aggregate": mongosColl.getName(), + "command.comment": testName, + "command.pipeline.$mergeCursors": {$exists: 1} + } + }); + } + + /** + * Throws an assertion if the aggregation specified by 'pipeline' does not produce + * 'expectedCount' results, or if the merge phase is not performed on the mongoS. + */ + function assertMergeOnMongoS({testName, pipeline, expectedCount}) { + assertMergeBehaviour({ + testName: testName, + pipeline: pipeline, + mergeOnMongoS: true, + expectedCount: expectedCount + }); + } + + /** + * Throws an assertion if the aggregation specified by 'pipeline' does not produce + * 'expectedCount' results, or if the merge phase was not performed on a shard. + */ + function assertMergeOnMongoD({testName, pipeline, expectedCount}) { + assertMergeBehaviour({ + testName: testName, + pipeline: pipeline, + mergeOnMongoS: false, + expectedCount: expectedCount + }); + } + + // + // Test cases. + // + + let testName; + + // Test that a $match pipeline with an empty merge stage is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_match_only", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}], + expectedCount: 400 + }); + + // Test that a $sort stage which merges pre-sorted streams is run on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_sort_presorted", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}], + expectedCount: 400 + }); + + // Test that a $sort stage which must sort the dataset from scratch is NOT run on mongoS. + assertMergeOnMongoD({ + testName: "agg_mongos_merge_sort_in_mem", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$sort: {a: 1}}], + expectedCount: 400 + }); + + // Test that $skip is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_skip", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sort: {_id: -1}}, {$skip: 100}], + expectedCount: 300 + }); + + // Test that $limit is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_limit", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$limit: 50}], + expectedCount: 50 + }); + + // Test that $sample is merged on mongoS. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_sample", + pipeline: [{$match: {_id: {$gte: -200, $lte: 200}}}, {$sample: {size: 50}}], + expectedCount: 50 + }); + + // Test that merge pipelines containing all mongos-runnable stages produce the expected output. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_all_mongos_runnable_stages", + pipeline: [ + {$match: {_id: {$gte: -5, $lte: 100}}}, + {$sort: {_id: -1}}, + {$skip: 95}, + {$limit: 10}, + {$addFields: {d: true}}, + {$unwind: "$a"}, + {$sample: {size: 5}}, + {$project: {c: 0}}, + { + $redact: { + $cond: + {if: {$eq: ["$redactThisDoc", true]}, then: "$$PRUNE", else: "$$DESCEND"} + } + }, + { + $match: { + _id: {$gte: -4, $lte: 5}, + a: {$gte: -4, $lte: 5}, + b: {$exists: false}, + c: {$exists: false}, + d: true + } + } + ], + expectedCount: 5 + }); +})();
\ No newline at end of file diff --git a/jstests/aggregation/shard_targeting.js b/jstests/aggregation/shard_targeting.js index 4edec931cde..7b33854f601 100644 --- a/jstests/aggregation/shard_targeting.js +++ b/jstests/aggregation/shard_targeting.js @@ -40,10 +40,6 @@ assert.commandWorked(mongosDB.dropDatabase()); - // Always merge on primary shard, so we know where to look for $mergeCursors stages. - assert.commandWorked( - mongosDB.adminCommand({setParameter: 1, internalQueryAlwaysMergeOnPrimaryShard: true})); - // Enable sharding on the test DB and ensure its primary is shard0000. assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); st.ensurePrimaryShard(mongosDB.getName(), "shard0000"); @@ -79,6 +75,9 @@ ErrorCodes.StaleEpoch ]; + // Create an $_internalSplitPipeline stage that forces the merge to occur on the Primary shard. + const forcePrimaryMerge = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}]; + function runAggShardTargetTest({splitPoint}) { // Ensure that both mongoS have up-to-date caches, and enable the profiler on both shards. assert.commandWorked(mongosForAgg.getDB("admin").runCommand({flushRouterConfig: 1})); @@ -166,10 +165,13 @@ // Run the same aggregation that targeted a single shard via the now-stale mongoS. It should // attempt to send the aggregation to shard0000, hit a stale config exception, split the - // pipeline and redispatch. + // pipeline and redispatch. We append an $_internalSplitPipeline stage in order to force a + // shard merge rather than a mongoS merge. testName = "agg_shard_targeting_backout_passthrough_and_split_if_cache_is_stale"; assert.eq(mongosColl - .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}].concat(splitPoint), + .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}] + .concat(splitPoint) + .concat(forcePrimaryMerge), {comment: testName}) .itcount(), 2); @@ -259,10 +261,14 @@ {moveChunk: mongosColl.getFullName(), find: {_id: -50}, to: "shard0000"})); // Run the same aggregation via the now-stale mongoS. It should split the pipeline, hit a - // stale config exception, and reset to the original single-shard pipeline upon refresh. + // stale config exception, and reset to the original single-shard pipeline upon refresh. We + // append an $_internalSplitPipeline stage in order to force a shard merge rather than a + // mongoS merge. testName = "agg_shard_targeting_backout_split_pipeline_and_reassemble_if_cache_is_stale"; assert.eq(mongosColl - .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}].concat(splitPoint), + .aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}] + .concat(splitPoint) + .concat(forcePrimaryMerge), {comment: testName}) .itcount(), 2); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 4ceb1616a36..1d622a2c8cf 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -242,6 +242,7 @@ docSourceEnv.Library( 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_internal_inhibit_optimization.cpp', + 'document_source_internal_split_pipeline.cpp', 'document_source_limit.cpp', 'document_source_match.cpp', 'document_source_merge_cursors.cpp', diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index efb695c8f59..8f1c2fef4a0 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -128,9 +128,18 @@ public: */ enum class PositionRequirement { kNone, kFirst, kLast }; + /** + * A HostTypeRequirement defines where this stage is permitted to be executed when the + * pipeline is run on a sharded cluster. + */ + enum class HostTypeRequirement { kPrimaryShard, kAnyShard, kAnyShardOrMongoS }; + // Set if this stage needs to be in a particular position of the pipeline. PositionRequirement requiredPosition = PositionRequirement::kNone; + // Set if this stage can only be executed on specific components of a sharded cluster. + HostTypeRequirement hostRequirement = HostTypeRequirement::kAnyShard; + bool isAllowedInsideFacetStage = true; // True if this stage does not generate results itself, and instead pulls inputs from an @@ -147,12 +156,11 @@ public: // must also override getModifiedPaths() to provide information about which particular // $match predicates be swapped before itself. bool canSwapWithMatch = false; - - // True if this stage must run on the primary shard when the collection being aggregated is - // sharded. - bool mustRunOnPrimaryShardIfSharded = false; }; + using HostTypeRequirement = StageConstraints::HostTypeRequirement; + using PositionRequirement = StageConstraints::PositionRequirement; + /** * This is what is returned from the main DocumentSource API: getNext(). It is essentially a * (ReturnStatus, Document) pair, with the first entry being used to communicate information diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 40d9fe5555e..64f313649af 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -96,7 +96,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.isAllowedInsideFacetStage = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h index 6334bbe55c8..2903241d1a7 100644 --- a/src/mongo/db/pipeline/document_source_coll_stats.h +++ b/src/mongo/db/pipeline/document_source_coll_stats.h @@ -76,7 +76,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h index cd86d027733..d5138e74d2a 100644 --- a/src/mongo/db/pipeline/document_source_current_op.h +++ b/src/mongo/db/pipeline/document_source_current_op.h @@ -80,7 +80,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; constraints.isIndependentOfAnyCollection = true; diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 57a41996af9..c15c0495ba7 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -53,7 +53,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp index aba1f5f115c..e553ea2fa62 100644 --- a/src/mongo/db/pipeline/document_source_facet.cpp +++ b/src/mongo/db/pipeline/document_source_facet.cpp @@ -243,12 +243,12 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints() const { for (auto&& facet : _facets) { for (auto&& nestedStage : facet.pipeline->getSources()) { - if (nestedStage->constraints().mustRunOnPrimaryShardIfSharded) { + if (nestedStage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard) { // Currently we don't split $facet to have a merger part and a shards part (see // SERVER-24154). This means that if any stage in any of the $facet pipelines // requires the primary shard, then the entire $facet must happen on the merger, and // the merger must be the primary shard. - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; } } } diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp index 60aefdead64..d8798a5c70e 100644 --- a/src/mongo/db/pipeline/document_source_facet_test.cpp +++ b/src/mongo/db/pipeline/document_source_facet_test.cpp @@ -609,7 +609,7 @@ class DocumentSourceNeedsPrimaryShard final : public DocumentSourcePassthrough { public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } @@ -633,7 +633,8 @@ TEST_F(DocumentSourceFacetTest, ShouldRequirePrimaryShardIfAnyStageRequiresPrima facets.emplace_back("needsPrimaryShard", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_TRUE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT(facetStage->constraints().hostRequirement == + DocumentSource::StageConstraints::HostTypeRequirement::kPrimaryShard); } TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPrimaryShard) { @@ -652,7 +653,8 @@ TEST_F(DocumentSourceFacetTest, ShouldNotRequirePrimaryShardIfNoStagesRequiresPr facets.emplace_back("second", std::move(secondPipeline)); auto facetStage = DocumentSourceFacet::create(std::move(facets), ctx); - ASSERT_FALSE(facetStage->constraints().mustRunOnPrimaryShardIfSharded); + ASSERT(facetStage->constraints().hostRequirement == + DocumentSource::StageConstraints::HostTypeRequirement::kAnyShard); } } // namespace diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h index 5df25d6c1c6..315dcea6dd4 100644 --- a/src/mongo/db/pipeline/document_source_geo_near.h +++ b/src/mongo/db/pipeline/document_source_geo_near.h @@ -49,7 +49,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h index 489a80ea7b9..53394d3d0ab 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.h +++ b/src/mongo/db/pipeline/document_source_graph_lookup.h @@ -56,7 +56,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h index dd62f5323bd..83546654361 100644 --- a/src/mongo/db/pipeline/document_source_index_stats.h +++ b/src/mongo/db/pipeline/document_source_index_stats.h @@ -71,7 +71,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h index a750443a5b5..d1967b6625f 100644 --- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h +++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h @@ -52,6 +52,12 @@ public: return kStageName.rawData(); } + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + GetNextResult getNext() final; private: diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp new file mode 100644 index 00000000000..a49514cf628 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.cpp @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_internal_split_pipeline.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(_internalSplitPipeline, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceInternalSplitPipeline::createFromBson); + +constexpr StringData DocumentSourceInternalSplitPipeline::kStageName; + +boost::intrusive_ptr<DocumentSource> DocumentSourceInternalSplitPipeline::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << "$_internalSplitPipeline must take a nested object but found: " + << elem, + elem.type() == BSONType::Object); + + auto specObj = elem.embeddedObject(); + + HostTypeRequirement mergeType = HostTypeRequirement::kAnyShard; + + for (auto&& elt : specObj) { + if (elt.fieldNameStringData() == "mergeType"_sd) { + uassert(ErrorCodes::BadValue, + str::stream() << "'mergeType' must be a string value but found: " << elt.type(), + elt.type() == BSONType::String); + + auto mergeTypeString = elt.valueStringData(); + + if ("anyShard"_sd == mergeTypeString) { + mergeType = HostTypeRequirement::kAnyShard; + } else if ("primaryShard"_sd == mergeTypeString) { + mergeType = HostTypeRequirement::kPrimaryShard; + } else if ("mongos"_sd == mergeTypeString) { + mergeType = HostTypeRequirement::kAnyShardOrMongoS; + } else { + uasserted(ErrorCodes::BadValue, + str::stream() << "unrecognized field while parsing mergeType: '" + << elt.fieldNameStringData() + << "'"); + } + } else { + uasserted(ErrorCodes::BadValue, + str::stream() << "unrecognized field while parsing $_internalSplitPipeline: '" + << elt.fieldNameStringData() + << "'"); + } + } + + return new DocumentSourceInternalSplitPipeline(expCtx, mergeType); +} + +DocumentSource::GetNextResult DocumentSourceInternalSplitPipeline::getNext() { + pExpCtx->checkForInterrupt(); + return pSource->getNext(); +} + +Value DocumentSourceInternalSplitPipeline::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + std::string mergeTypeString; + + switch (_mergeType) { + case HostTypeRequirement::kAnyShardOrMongoS: + mergeTypeString = "mongos"; + break; + + case HostTypeRequirement::kPrimaryShard: + mergeTypeString = "primaryShard"; + break; + + default: + mergeTypeString = "anyShard"; + break; + } + + return Value(Document{{getSourceName(), Value{Document{{"mergeType", mergeTypeString}}}}}); +} + +} // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h new file mode 100644 index 00000000000..c18a6d301a6 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * An internal stage available for testing. Acts as a simple passthrough of intermediate results + * from the source stage, but forces the pipeline to split at the point where this stage appears + * (assuming that no earlier splitpoints exist). Takes a single parameter, 'mergeType', which can be + * one of 'anyShard', 'primaryShard' or 'mongos' to control where the merge may occur. Omitting this + * parameter or specifying 'anyShard' produces the default merging behaviour; the merge half of the + * pipeline will be sent to a random participating shard, subject to the requirements of any + * subsequent splittable stages in the pipeline. + */ +class DocumentSourceInternalSplitPipeline final : public DocumentSource, + public SplittableDocumentSource { +public: + static constexpr StringData kStageName = "$_internalSplitPipeline"_sd; + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement, const boost::intrusive_ptr<ExpressionContext>&); + + DocumentSourceInternalSplitPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + HostTypeRequirement mergeType) + : DocumentSource(expCtx), _mergeType(mergeType) {} + + const char* getSourceName() const final { + return kStageName.rawData(); + } + + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return this; + } + + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + } + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = _mergeType; + return constraints; + } + + GetNextResult getNext() final; + +private: + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + HostTypeRequirement _mergeType = HostTypeRequirement::kAnyShard; +}; + +} // namesace mongo diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h index c6660152d66..88acfa8b45a 100644 --- a/src/mongo/db/pipeline/document_source_limit.h +++ b/src/mongo/db/pipeline/document_source_limit.h @@ -42,6 +42,12 @@ public: : SimpleBSONObjComparator::kInstance.makeBSONObjSet(); } + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + /** * Attempts to combine with a subsequent $limit stage, setting 'limit' appropriately. */ diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d5bab6fd9e7..a4cd8d35ed6 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -97,7 +97,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; constraints.canSwapWithMatch = true; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index 2be99dd7f12..135dbc39e17 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -49,6 +49,13 @@ public: } const char* getSourceName() const override; + + StageConstraints constraints() const override { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + Value serialize( boost::optional<ExplainOptions::Verbosity> explain = boost::none) const override; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 93cd6dd93e7..d53c2183443 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -176,6 +176,12 @@ DocumentSource::GetNextResult DocumentSourceMergeCursors::getNext() { return std::move(next); } +bool DocumentSourceMergeCursors::remotesExhausted() const { + return std::all_of(_cursors.begin(), _cursors.end(), [](const auto& cursorAndConn) { + return cursorAndConn->cursor.isDead(); + }); +} + void DocumentSourceMergeCursors::doDispose() { for (auto&& cursorAndConn : _cursors) { // Note it is an error to call done() on a connection before consuming the reply from a diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h index 045ddad4836..e83316fa6ea 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.h +++ b/src/mongo/db/pipeline/document_source_merge_cursors.h @@ -55,7 +55,8 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; @@ -68,6 +69,11 @@ public: std::vector<CursorDescriptor> cursorDescriptors, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + /** + * Returns true if all remotes have reported that their cursors are closed. + */ + bool remotesExhausted() const; + /** Returns non-owning pointers to cursors managed by this stage. * Call this instead of getNext() if you want access to the raw streams. * This method should only be called at most once. diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h index 513994cd599..22f7a1aa24d 100644 --- a/src/mongo/db/pipeline/document_source_mock.h +++ b/src/mongo/db/pipeline/document_source_mock.h @@ -51,7 +51,7 @@ public: StageConstraints constraints() const override { StageConstraints constraints; - constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.requiredPosition = PositionRequirement::kFirst; constraints.requiresInputDocSource = false; constraints.isAllowedInsideFacetStage = false; return constraints; diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 9366d01a670..6756cd4df2a 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -46,9 +46,9 @@ public: StageConstraints constraints() const final { StageConstraints constraints; - constraints.mustRunOnPrimaryShardIfSharded = true; + constraints.hostRequirement = HostTypeRequirement::kPrimaryShard; constraints.isAllowedInsideFacetStage = false; - constraints.requiredPosition = StageConstraints::PositionRequirement::kLast; + constraints.requiredPosition = PositionRequirement::kLast; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h index a76ca9c7940..bedc434d61e 100644 --- a/src/mongo/db/pipeline/document_source_redact.h +++ b/src/mongo/db/pipeline/document_source_redact.h @@ -40,6 +40,12 @@ public: const char* getSourceName() const final; boost::intrusive_ptr<DocumentSource> optimize() final; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + /** * Attempts to duplicate the redact-safe portion of a subsequent $match before the $redact * stage. diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp index 5117e5c516a..c174d95d935 100644 --- a/src/mongo/db/pipeline/document_source_sample.cpp +++ b/src/mongo/db/pipeline/document_source_sample.cpp @@ -123,6 +123,11 @@ intrusive_ptr<DocumentSource> DocumentSourceSample::getShardSource() { intrusive_ptr<DocumentSource> DocumentSourceSample::getMergeSource() { // Just need to merge the pre-sorted documents by their random values. - return DocumentSourceSort::create(pExpCtx, randSortSpec, _size); + BSONObjBuilder randMergeSortSpec; + + randMergeSortSpec.appendElements(randSortSpec); + randMergeSortSpec.append("$mergePresorted", true); + + return DocumentSourceSort::create(pExpCtx, randMergeSortSpec.obj(), _size); } } // mongo diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h index ec88c0737a8..662c6a9a49d 100644 --- a/src/mongo/db/pipeline/document_source_sample.h +++ b/src/mongo/db/pipeline/document_source_sample.h @@ -39,6 +39,12 @@ public: const char* getSourceName() const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + GetDepsReturn getDependencies(DepsTracker* deps) const final { return SEE_NEXT; } diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index b50065bc303..188e9864310 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -103,6 +103,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h index 46cb6a6f3ab..a69f5e59eb5 100644 --- a/src/mongo/db/pipeline/document_source_skip.h +++ b/src/mongo/db/pipeline/document_source_skip.h @@ -37,6 +37,13 @@ public: // virtuals from DocumentSource GetNextResult getNext() final; const char* getSourceName() const final; + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; + return constraints; + } + /** * Attempts to move a subsequent $limit before the skip, potentially allowing for forther * optimizations earlier in the pipeline. diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h index 866a3fb955c..5731051cbd3 100644 --- a/src/mongo/db/pipeline/document_source_sort.h +++ b/src/mongo/db/pipeline/document_source_sort.h @@ -57,6 +57,10 @@ public: // Can't swap with a $match if a limit has been absorbed, since in general match can't swap // with limit. constraints.canSwapWithMatch = !limitSrc; + + // Can run on mongoS only if this stage is merging presorted streams. + constraints.hostRequirement = (_mergingPresorted ? HostTypeRequirement::kAnyShardOrMongoS + : HostTypeRequirement::kAnyShard); return constraints; } diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h index fb7bd68ba39..5bc9a91afdb 100644 --- a/src/mongo/db/pipeline/document_source_unwind.h +++ b/src/mongo/db/pipeline/document_source_unwind.h @@ -48,6 +48,7 @@ public: StageConstraints constraints() const final { StageConstraints constraints; + constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS; constraints.canSwapWithMatch = true; return constraints; } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index e5cd62a4670..8b84fe36bff 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -62,6 +62,9 @@ using std::vector; namespace dps = ::mongo::dotted_path_support; +using HostTypeRequirement = DocumentSource::StageConstraints::HostTypeRequirement; +using PositionRequirement = DocumentSource::StageConstraints::PositionRequirement; + Pipeline::Pipeline(const intrusive_ptr<ExpressionContext>& pTheCtx) : pCtx(pTheCtx) {} Pipeline::Pipeline(SourceContainer stages, const intrusive_ptr<ExpressionContext>& expCtx) @@ -171,8 +174,7 @@ Status Pipeline::validateFacetPipeline() const { // We expect a stage within a $facet stage to have these properties. invariant(stageConstraints.requiresInputDocSource); invariant(!stageConstraints.isIndependentOfAnyCollection); - invariant(stageConstraints.requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kNone); + invariant(stageConstraints.requiredPosition == PositionRequirement::kNone); } // Facet pipelines cannot have any stages which are initial sources. We've already validated the @@ -184,9 +186,7 @@ Status Pipeline::validateFacetPipeline() const { Status Pipeline::ensureAllStagesAreInLegalPositions() const { size_t i = 0; for (auto&& stage : _sources) { - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kFirst && - i != 0) { + if (stage->constraints().requiredPosition == PositionRequirement::kFirst && i != 0) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() << " is only valid as the first stage in a pipeline.", @@ -199,8 +199,7 @@ Status Pipeline::ensureAllStagesAreInLegalPositions() const { 17313}; } - if (stage->constraints().requiredPosition == - DocumentSource::StageConstraints::PositionRequirement::kLast && + if (stage->constraints().requiredPosition == PositionRequirement::kLast && i != _sources.size() - 1) { return {ErrorCodes::BadValue, str::stream() << stage->getSourceName() @@ -312,6 +311,8 @@ std::unique_ptr<Pipeline, Pipeline::Deleter> Pipeline::splitForSharded() { shardPipeline->_splitForSharded = true; _splitForMerge = true; + stitch(); + return shardPipeline; } @@ -428,7 +429,13 @@ BSONObj Pipeline::getInitialQuery() const { bool Pipeline::needsPrimaryShardMerger() const { return std::any_of(_sources.begin(), _sources.end(), [](const auto& stage) { - return stage->constraints().mustRunOnPrimaryShardIfSharded; + return stage->constraints().hostRequirement == HostTypeRequirement::kPrimaryShard; + }); +} + +bool Pipeline::canRunOnMongos() const { + return std::all_of(_sources.begin(), _sources.end(), [](const auto& stage) { + return stage->constraints().hostRequirement == HostTypeRequirement::kAnyShardOrMongoS; }); } diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 677cfe7b6d1..ed19d44ae2b 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -219,6 +219,11 @@ public: bool needsPrimaryShardMerger() const; /** + * Returns whether or not every DocumentSource in the pipeline can run on mongoS. + */ + bool canRunOnMongos() const; + + /** * Modifies the pipeline, optimizing it by combining and swapping stages. */ void optimizePipeline(); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 0c841ed5e45..2653b607ab4 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -778,6 +778,8 @@ env.CppUnitTest( LIBDEPS=[ 'repl_coordinator_impl', 'replmocks', + '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -791,6 +793,8 @@ env.CppUnitTest( 'repl_coordinator_impl', 'replica_set_messages', 'replmocks', + '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -819,6 +823,8 @@ env.CppUnitTest( 'repl_coordinator_impl', 'replica_set_messages', 'replmocks', + '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', ], ) @@ -1396,6 +1402,7 @@ env.CppUnitTest( 'sync_source_selector_mock', 'task_executor_mock', '$BUILD_DIR/mongo/db/query/command_request_response', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/unittest/concurrency', ], diff --git a/src/mongo/s/cluster_cursor_stats.cpp b/src/mongo/s/cluster_cursor_stats.cpp index d0620705052..4aa5ca97891 100644 --- a/src/mongo/s/cluster_cursor_stats.cpp +++ b/src/mongo/s/cluster_cursor_stats.cpp @@ -50,11 +50,12 @@ public: { BSONObjBuilder openBob(cursorBob.subobjStart("open")); auto stats = grid.getCursorManager()->stats(); - openBob.append("multiTarget", static_cast<long long>(stats.cursorsSharded)); - openBob.append("singleTarget", static_cast<long long>(stats.cursorsNotSharded)); + openBob.append("multiTarget", static_cast<long long>(stats.cursorsMultiTarget)); + openBob.append("singleTarget", static_cast<long long>(stats.cursorsSingleTarget)); openBob.append("pinned", static_cast<long long>(stats.cursorsPinned)); - openBob.append("total", - static_cast<long long>(stats.cursorsSharded + stats.cursorsNotSharded)); + openBob.append( + "total", + static_cast<long long>(stats.cursorsMultiTarget + stats.cursorsSingleTarget)); openBob.doneFast(); } cursorBob.done(); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index cf6b5991642..3b3e87129ab 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -34,6 +34,7 @@ #include <boost/intrusive_ptr.hpp> +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" @@ -42,6 +43,8 @@ #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/views/resolved_view.h" #include "mongo/db/views/view.h" #include "mongo/executor/task_executor_pool.h" @@ -51,6 +54,9 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" +#include "mongo/s/query/cluster_client_cursor_impl.h" +#include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/store_possible_cursor.h" @@ -86,15 +92,15 @@ Status appendExplainResults( const std::vector<AsyncRequestsSender::Response>& shardResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMergingShard, + const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging, BSONObjBuilder* result) { if (pipelineForTargetedShards->isSplitForSharded()) { - *result << "needsPrimaryShardMerger" << pipelineForMergingShard->needsPrimaryShardMerger() - << "splitPipeline" - << Document{{"shardsPart", - pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)}, - {"mergerPart", - pipelineForMergingShard->writeExplainOps(*mergeCtx->explain)}}; + *result << "needsPrimaryShardMerger" << pipelineForMerging->needsPrimaryShardMerger() + << "mergeOnMongoS" << pipelineForMerging->canRunOnMongos() << "splitPipeline" + << Document{ + {"shardsPart", + pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)}, + {"mergerPart", pipelineForMerging->writeExplainOps(*mergeCtx->explain)}}; } else { *result << "splitPipeline" << BSONNULL; } @@ -207,10 +213,10 @@ BSONObj createCommandForMergingShard( const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMergingShard) { + const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); - mergeCmd["pipeline"] = Value(pipelineForMergingShard->serialize()); + mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); mergeCmd[AggregationRequest::kFromRouterName] = Value(true); mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); @@ -284,7 +290,7 @@ StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardC return swCursors; } -StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergeCursor( +StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor( OperationContext* opCtx, const NamespaceString& nss, const std::vector<ClusterClientCursorParams::RemoteCursor>& cursors, @@ -307,6 +313,73 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergeCursor( return {{std::move(mergingShardId), std::move(shardCmdResponse)}}; } +BSONObj establishMergingMongosCursor( + OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging, + std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { + + ClusterClientCursorParams params( + requestedNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + ReadPreferenceSetting::get(opCtx)); + + params.mergePipeline = std::move(pipelineForMerging); + params.remotes = std::move(cursors); + params.batchSize = request.getBatchSize(); + + auto ccc = ClusterClientCursorImpl::make( + opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + + auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + BSONObjBuilder cursorResponse; + + CursorResponseBuilder responseBuilder(true, &cursorResponse); + + ccc->reattachToOperationContext(opCtx); + + for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { + auto next = uassertStatusOK(ccc->next()); + + // Check whether we have exhausted the pipeline's results. + if (next.isEOF()) { + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; + } + + // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor + // to be returned on the next getMore. + auto nextObj = *next.getResult(); + + if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { + ccc->queueResult(nextObj); + break; + } + + responseBuilder.append(nextObj); + } + + ccc->detachFromOperationContext(); + + CursorId clusterCursorId = 0; + + if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { + clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( + opCtx, + ccc.releaseCursor(), + requestedNss, + ClusterCursorManager::CursorType::MultiTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + } + + responseBuilder.done(clusterCursorId, requestedNss.ns()); + + Command::appendCommandStatus(cursorResponse, Status::OK()); + + return cursorResponse.obj(); +} + } // namespace Status ClusterAggregate::runAggregate(OperationContext* opCtx, @@ -394,7 +467,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, const auto shardQuery = pipeline->getInitialQuery(); auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMergingShard; + std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; int numAttempts = 0; @@ -428,10 +501,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If we have to run on multiple shards and the pipeline is not yet split, split it. If we // can run on a single shard and the pipeline is already split, reassemble it. if (needsSplit && !isSplit) { - pipelineForMergingShard = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMergingShard->splitForSharded(); + pipelineForMerging = std::move(pipelineForTargetedShards); + pipelineForTargetedShards = pipelineForMerging->splitForSharded(); } else if (!needsSplit && isSplit) { - pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMergingShard)); + pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); } // Generate the command object for the targeted shards. @@ -479,7 +552,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return appendExplainResults(std::move(shardResults), mergeCtx, pipelineForTargetedShards, - pipelineForMergingShard, + pipelineForMerging, result); } @@ -505,15 +578,31 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If we reach here, we have a merge pipeline to dispatch. - invariant(pipelineForMergingShard); + invariant(pipelineForMerging); - pipelineForMergingShard->addInitialSource( + // We need a DocumentSourceMergeCursors regardless of whether we merge on mongoS or on a shard. + pipelineForMerging->addInitialSource( DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx)); - auto mergeCmdObj = - createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMergingShard); + // First, check whether we can merge on the mongoS. + if (pipelineForMerging->canRunOnMongos() && !internalQueryProhibitMergingOnMongoS.load()) { + // Register the new mongoS cursor, and retrieve the initial batch of results. + auto cursorResponse = establishMergingMongosCursor(opCtx, + request, + namespaces.requestedNss, + std::move(pipelineForMerging), + std::move(cursors)); + + // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline + // can never run on mongoS. Filter the command response and return immediately. + Command::filterCommandReplyForPassthrough(cursorResponse, result); + return getStatusFromCommandResult(result->asTempObj()); + } + + // If we cannot merge on mongoS, establish the merge cursor on a shard. + auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMerging); - auto mergeResponse = uassertStatusOK(establishMergeCursor( + auto mergeResponse = uassertStatusOK(establishMergingShardCursor( opCtx, namespaces.executionNss, cursors, diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 5a8f3cb5a90..df9223536fa 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -1548,7 +1548,7 @@ public: } return Status(ErrorCodes::Unauthorized, - str::stream() << "Not authorized to create users on db: " << dbname); + str::stream() << "Not authorized to list collections on db: " << dbname); } bool supportsWriteConcern(const BSONObj& cmd) const override { diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index f2a7b25cb28..2977050be19 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -32,6 +32,7 @@ env.Library( env.Library( target="router_exec_stage", source=[ + "router_stage_aggregation_merge.cpp", "router_stage_limit.cpp", "router_stage_merge.cpp", "router_stage_mock.cpp", @@ -69,6 +70,7 @@ env.Library( "$BUILD_DIR/mongo/executor/task_executor_interface", "$BUILD_DIR/mongo/s/async_requests_sender", "$BUILD_DIR/mongo/s/client/sharding_client", + '$BUILD_DIR/mongo/db/pipeline/pipeline', "$BUILD_DIR/mongo/s/coreshard", ], ) diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 5ac983f260b..aca0f54c704 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -66,7 +66,7 @@ public: * * A non-ok status is returned in case of any error. */ - virtual StatusWith<ClusterQueryResult> next(OperationContext* opCtx) = 0; + virtual StatusWith<ClusterQueryResult> next() = 0; /** * Must be called before destruction to abandon a not-yet-exhausted cursor. If next() has @@ -77,6 +77,17 @@ public: virtual void kill(OperationContext* opCtx) = 0; /** + * Sets the operation context for the cursor. Must be called before the first call to next(). + * The cursor attaches to a new OperationContext on each getMore. + */ + virtual void reattachToOperationContext(OperationContext* opCtx) = 0; + + /** + * Detaches the cursor from its current OperationContext. + */ + virtual void detachFromOperationContext() = 0; + + /** * Returns whether or not this cursor is tailing a capped collection on a shard. */ virtual bool isTailable() const = 0; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index a6a0c4ccdb3..0e6b5a197ce 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -32,6 +32,7 @@ #include "mongo/s/query/cluster_client_cursor_impl.h" +#include "mongo/s/query/router_stage_aggregation_merge.h" #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" #include "mongo/s/query/router_stage_mock.h" @@ -77,7 +78,7 @@ ClusterClientCursorImpl::ClusterClientCursorImpl(std::unique_ptr<RouterStageMock boost::optional<LogicalSessionId> lsid) : _params(std::move(params)), _root(std::move(root)), _lsid(lsid) {} -StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() { // First return stashed results, if there are any. if (!_stash.empty()) { auto front = std::move(_stash.front()); @@ -86,7 +87,7 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next(OperationContext* o return {front}; } - auto next = _root->next(opCtx); + auto next = _root->next(); if (next.isOK() && !next.getValue().isEOF()) { ++_numReturnedSoFar; } @@ -97,6 +98,14 @@ void ClusterClientCursorImpl::kill(OperationContext* opCtx) { _root->kill(opCtx); } +void ClusterClientCursorImpl::reattachToOperationContext(OperationContext* opCtx) { + _root->reattachToOperationContext(opCtx); +} + +void ClusterClientCursorImpl::detachFromOperationContext() { + _root->detachFromOperationContext(); +} + bool ClusterClientCursorImpl::isTailable() const { return _params.isTailable; } @@ -136,7 +145,14 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( const auto limit = params->limit; const bool hasSort = !params->sort.isEmpty(); - // The first stage is always the one which merges from the remotes. + // The first stage always merges from the remotes. If 'mergePipeline' has been specified in + // ClusterClientCursorParams, then RouterStageAggregationMerge should be the root and only node. + // Otherwise, construct a RouterStage pipeline from the remotes, skip, limit, and sort fields in + // 'params'. + if (params->mergePipeline) { + return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline)); + } + std::unique_ptr<RouterExecStage> root = stdx::make_unique<RouterStageMerge>(executor, params); if (skip) { diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index a13c18bfcb3..07dbab2094d 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -89,10 +89,14 @@ public: executor::TaskExecutor* executor, ClusterClientCursorParams&& params); - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; + void reattachToOperationContext(OperationContext* opCtx) final; + + void detachFromOperationContext() final; + bool isTailable() const final; UserNameIterator getAuthenticatedUsers() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index 549a692d720..8598fe192dc 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -40,9 +40,10 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, -// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use -// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. +// Note: Though the next() method on RouterExecStage and its subclasses depend on an +// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are +// mocked in this test using RouterStageMock. RouterStageMock does not actually use the +// OperationContext, so we omit the call to rettachToOperationContext in these tests. TEST(ClusterClientCursorImpl, NumReturnedSoFar) { auto mockStage = stdx::make_unique<RouterStageMock>(); @@ -57,13 +58,13 @@ TEST(ClusterClientCursorImpl, NumReturnedSoFar) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 0); for (int i = 1; i < 10; ++i) { - auto result = cursor.next(nullptr); + auto result = cursor.next(); ASSERT(result.isOK()); ASSERT_BSONOBJ_EQ(*result.getValue().getResult(), BSON("a" << i)); ASSERT_EQ(cursor.getNumReturnedSoFar(), i); } // Now check that if nothing is fetched the getNumReturnedSoFar stays the same. - auto result = cursor.next(nullptr); + auto result = cursor.next(); ASSERT_OK(result.getStatus()); ASSERT_TRUE(result.getValue().isEOF()); ASSERT_EQ(cursor.getNumReturnedSoFar(), 9LL); @@ -78,7 +79,7 @@ TEST(ClusterClientCursorImpl, QueueResult) { ClusterClientCursorParams(NamespaceString("unused"), {}), boost::none); - auto firstResult = cursor.next(nullptr); + auto firstResult = cursor.next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); @@ -86,22 +87,22 @@ TEST(ClusterClientCursorImpl, QueueResult) { cursor.queueResult(BSON("a" << 2)); cursor.queueResult(BSON("a" << 3)); - auto secondResult = cursor.next(nullptr); + auto secondResult = cursor.next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); - auto thirdResult = cursor.next(nullptr); + auto thirdResult = cursor.next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); - auto fourthResult = cursor.next(nullptr); + auto fourthResult = cursor.next(); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 4)); - auto fifthResult = cursor.next(nullptr); + auto fifthResult = cursor.next(); ASSERT_OK(fifthResult.getStatus()); ASSERT(fifthResult.getValue().isEOF()); @@ -119,19 +120,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) { boost::none); ASSERT_TRUE(cursor.remotesExhausted()); - auto firstResult = cursor.next(nullptr); + auto firstResult = cursor.next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); ASSERT_TRUE(cursor.remotesExhausted()); - auto secondResult = cursor.next(nullptr); + auto secondResult = cursor.next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(cursor.remotesExhausted()); - auto thirdResult = cursor.next(nullptr); + auto thirdResult = cursor.next(); ASSERT_OK(thirdResult.getStatus()); ASSERT_TRUE(thirdResult.getValue().isEOF()); ASSERT_TRUE(cursor.remotesExhausted()); diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index b6c7d9d0a81..8cc1771ec1c 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -44,7 +44,7 @@ ClusterClientCursorMock::~ClusterClientCursorMock() { invariant(_exhausted || _killed); } -StatusWith<ClusterQueryResult> ClusterClientCursorMock::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> ClusterClientCursorMock::next() { invariant(!_killed); if (_resultsQueue.empty()) { diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 81c97af72f1..004a635eaed 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -47,10 +47,14 @@ public: ~ClusterClientCursorMock(); - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; + void reattachToOperationContext(OperationContext* opCtx) final {} + + void detachFromOperationContext() final {} + bool isTailable() const final; UserNameIterator getAuthenticatedUsers() const final; diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 7507ad82ae8..1b4d76124c3 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -37,6 +37,7 @@ #include "mongo/db/auth/user_name.h" #include "mongo/db/cursor_id.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/cursor_response.h" #include "mongo/s/client/shard.h" #include "mongo/util/net/hostandport.h" @@ -110,6 +111,9 @@ struct ClusterClientCursorParams { // Should be forwarded to the remote hosts in 'cmdObj'. boost::optional<long long> limit; + // If set, we use this pipeline to merge the output of aggregations on each remote. + std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline; + // Whether this cursor is tailing a capped collection. bool isTailable = false; diff --git a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp index 57685653cf2..dcf19acf775 100644 --- a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp +++ b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp @@ -73,11 +73,12 @@ void ClusterCursorCleanupJob::run() { // Mirroring the behavior in CursorManager::timeoutCursors(), a negative value for // cursorTimeoutMillis has the same effect as a 0 value: cursors are cleaned immediately. auto cursorTimeoutValue = cursorTimeoutMillis.load(); + const auto opCtx = client->makeOperationContext(); Date_t cutoff = (cursorTimeoutValue > 0) ? (Date_t::now() - Milliseconds(cursorTimeoutValue)) : Date_t::now(); manager->killMortalCursorsInactiveSince(cutoff); - manager->incrementCursorsTimedOut(manager->reapZombieCursors()); + manager->incrementCursorsTimedOut(manager->reapZombieCursors(opCtx.get())); MONGO_IDLE_THREAD_BLOCK; sleepsecs(clientCursorMonitorFrequencySecs.load()); diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 5e6a5dbebf4..99e7b2afa82 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -110,9 +110,19 @@ ClusterCursorManager::PinnedCursor& ClusterCursorManager::PinnedCursor::operator return *this; } -StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> ClusterCursorManager::PinnedCursor::next() { invariant(_cursor); - return _cursor->next(opCtx); + return _cursor->next(); +} + +void ClusterCursorManager::PinnedCursor::reattachToOperationContext(OperationContext* opCtx) { + invariant(_cursor); + _cursor->reattachToOperationContext(opCtx); +} + +void ClusterCursorManager::PinnedCursor::detachFromOperationContext() { + invariant(_cursor); + _cursor->detachFromOperationContext(); } bool ClusterCursorManager::PinnedCursor::isTailable() const { @@ -182,13 +192,13 @@ ClusterCursorManager::~ClusterCursorManager() { invariant(_namespaceToContainerMap.empty()); } -void ClusterCursorManager::shutdown() { +void ClusterCursorManager::shutdown(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_mutex); _inShutdown = true; lk.unlock(); killAllCursors(); - reapZombieCursors(); + reapZombieCursors(opCtx); } StatusWith<CursorId> ClusterCursorManager::registerCursor( @@ -356,7 +366,7 @@ void ClusterCursorManager::killAllCursors() { } } -std::size_t ClusterCursorManager::reapZombieCursors() { +std::size_t ClusterCursorManager::reapZombieCursors(OperationContext* opCtx) { struct CursorDescriptor { CursorDescriptor(NamespaceString ns, CursorId cursorId, bool isInactive) : ns(std::move(ns)), cursorId(cursorId), isInactive(isInactive) {} @@ -395,11 +405,9 @@ std::size_t ClusterCursorManager::reapZombieCursors() { } lk.unlock(); - // Pass a null OperationContext, because this call should not actually schedule any remote - // work: the cursor is already pending kill, meaning the killCursors commands are already - // being scheduled to be sent to the remote shard hosts. This method will just wait for them - // all to be scheduled. - zombieCursor.getValue()->kill(nullptr); + // Pass opCtx to kill(), since a cursor which wraps an underlying aggregation pipeline is + // obliged to call Pipeline::dispose with a valid OperationContext prior to deletion. + zombieCursor.getValue()->kill(opCtx); zombieCursor.getValue().reset(); lk.lock(); @@ -430,11 +438,11 @@ ClusterCursorManager::Stats ClusterCursorManager::stats() const { } switch (entry.getCursorType()) { - case CursorType::NamespaceNotSharded: - ++stats.cursorsNotSharded; + case CursorType::SingleTarget: + ++stats.cursorsSingleTarget; break; - case CursorType::NamespaceSharded: - ++stats.cursorsSharded; + case CursorType::MultiTarget: + ++stats.cursorsMultiTarget; break; } } diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 86bfcb31bbe..f6da640bb1e 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -76,11 +76,11 @@ public: // enum class CursorType { - // Represents a cursor operating on an unsharded namespace. - NamespaceNotSharded, + // Represents a cursor retrieving data from a single remote source. + SingleTarget, - // Represents a cursor operating on a sharded namespace. - NamespaceSharded, + // Represents a cursor retrieving data from multiple remote sources. + MultiTarget, }; enum class CursorLifetime { @@ -100,11 +100,11 @@ public: }; struct Stats { - // Count of open cursors registered with CursorType::NamespaceSharded. - size_t cursorsSharded = 0; + // Count of open cursors registered with CursorType::MultiTarget. + size_t cursorsMultiTarget = 0; - // Count of open cursors registered with CursorType::NamespaceNotSharded. - size_t cursorsNotSharded = 0; + // Count of open cursors registered with CursorType::SingleTarget. + size_t cursorsSingleTarget = 0; // Count of pinned cursors. size_t cursorsPinned = 0; @@ -154,7 +154,18 @@ public: * * Can block. */ - StatusWith<ClusterQueryResult> next(OperationContext* opCtx); + StatusWith<ClusterQueryResult> next(); + + /** + * Sets the operation context for the cursor. Must be called before the first call to + * next(). + */ + void reattachToOperationContext(OperationContext* opCtx); + + /** + * Detaches the cursor from its current OperationContext. + */ + void detachFromOperationContext(); /** * Returns whether or not the underlying cursor is tailing a capped collection. Cannot be @@ -252,7 +263,7 @@ public: * Kills and reaps all cursors currently owned by this cursor manager, and puts the manager * into the shutting down state where it will not accept any new cursors for registration. */ - void shutdown(); + void shutdown(OperationContext* opCtx); /** * Registers the given cursor with this manager, and returns the registered cursor's id, or @@ -322,13 +333,13 @@ public: * as 'kill pending'. Returns the number of cursors that were marked as inactive. * * If no other non-const methods are called simultaneously, it is guaranteed that this method - * will delete all non-pinned cursors marked as 'kill pending'. Otherwise, no such guarantee is + * will delete all non-pinned cursors marked as 'kill pending'. Otherwise, no such guarantee is * made (this is due to the fact that the blocking kill for each cursor is performed outside of * the cursor manager lock). * * Can block. */ - std::size_t reapZombieCursors(); + std::size_t reapZombieCursors(OperationContext* opCtx); /** * Returns the number of open cursors on a ClusterCursorManager, broken down by type. @@ -496,7 +507,7 @@ private: std::unique_ptr<ClusterClientCursor> _cursor; bool _killPending = false; bool _isInactive = false; - CursorType _cursorType = CursorType::NamespaceNotSharded; + CursorType _cursorType = CursorType::SingleTarget; CursorLifetime _cursorLifetime = CursorLifetime::Mortal; Date_t _lastActive; boost::optional<LogicalSessionId> _lsid; diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index 46377d2f483..ab46f50e537 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -91,7 +91,7 @@ protected: private: void tearDown() final { _manager.killAllCursors(); - _manager.reapZombieCursors(); + _manager.reapZombieCursors(nullptr); } // List of flags representing whether our allocated cursors have been killed yet. The value of @@ -110,31 +110,31 @@ private: TEST_F(ClusterCursorManagerTest, RegisterCursor) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << 1)); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - std::move(cursor), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + std::move(cursor), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - auto nextResult = pinnedCursor.getValue().next(nullptr); + auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); - nextResult = pinnedCursor.getValue().next(nullptr); + nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } // Test that registering a cursor returns a non-zero cursor id. TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_NE(0, cursorId); } @@ -142,20 +142,20 @@ TEST_F(ClusterCursorManagerTest, RegisterCursorReturnsNonZeroId) { TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << 1)); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - std::move(cursor), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + std::move(cursor), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); - auto nextResult = checkedOutCursor.getValue().next(nullptr); + auto nextResult = checkedOutCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << 1), *nextResult.getValue().getResult()); - nextResult = checkedOutCursor.getValue().next(nullptr); + nextResult = checkedOutCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -168,21 +168,21 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { for (int i = 0; i < numCursors; ++i) { auto cursor = allocateMockCursor(); cursor->queueResult(BSON("a" << i)); - cursorIds[i] = assertGet( - getManager()->registerCursor(nullptr, - std::move(cursor), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + cursorIds[i] = + assertGet(getManager()->registerCursor(nullptr, + std::move(cursor), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); } for (int i = 0; i < numCursors; ++i) { auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr); ASSERT_OK(pinnedCursor.getStatus()); - auto nextResult = pinnedCursor.getValue().next(nullptr); + auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); ASSERT(nextResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(BSON("a" << i), *nextResult.getValue().getResult()); - nextResult = pinnedCursor.getValue().next(nullptr); + nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); ASSERT_TRUE(nextResult.getValue().isEOF()); } @@ -190,12 +190,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { // Test that checking out a pinned cursor returns an error with code ErrorCodes::CursorInUse. TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_EQ(ErrorCodes::CursorInUse, @@ -204,12 +204,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { // Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound. TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); @@ -226,12 +226,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUnknown) { TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { const NamespaceString correctNamespace("test.correct"); const NamespaceString incorrectNamespace("test.incorrect"); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - correctNamespace, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + correctNamespace, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(incorrectNamespace, cursorId, nullptr).getStatus()); } @@ -239,12 +239,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { // Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound, // even if there is an existing cursor with the same namespace but a different cursor id. TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, cursorId + 1, nullptr).getStatus()); } @@ -252,12 +252,12 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { // Test that checking out a cursor updates the 'last active' time associated with the cursor to the // current time. TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); Date_t cursorRegistrationTime = getClockSource()->now(); getClockSource()->advance(Milliseconds(1)); auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); @@ -265,19 +265,19 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); getManager()->killMortalCursorsInactiveSince(cursorRegistrationTime); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } // Test that checking in a cursor updates the 'last active' time associated with the cursor to the // current time. TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); Date_t cursorCheckOutTime = getClockSource()->now(); auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); @@ -285,23 +285,23 @@ TEST_F(ClusterCursorManagerTest, ReturnCursorUpdateActiveTime) { checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); getManager()->killMortalCursorsInactiveSince(cursorCheckOutTime); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } // Test that killing a pinned cursor by id successfully kills the cursor. TEST_F(ClusterCursorManagerTest, KillCursorBasic) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId())); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } @@ -312,18 +312,18 @@ TEST_F(ClusterCursorManagerTest, KillCursorMultipleCursors) { std::vector<CursorId> cursorIds(numCursors); // Register cursors and populate 'cursorIds' with the returned cursor ids. for (size_t i = 0; i < numCursors; ++i) { - cursorIds[i] = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + cursorIds[i] = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); } // Kill each cursor and verify that it was successfully killed. for (size_t i = 0; i < numCursors; ++i) { ASSERT_OK(getManager()->killCursor(nss, cursorIds[i])); ASSERT(!isMockCursorKilled(i)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(i)); } } @@ -339,12 +339,12 @@ TEST_F(ClusterCursorManagerTest, KillCursorUnknown) { TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) { const NamespaceString correctNamespace("test.correct"); const NamespaceString incorrectNamespace("test.incorrect"); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - correctNamespace, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + correctNamespace, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); Status killResult = getManager()->killCursor(incorrectNamespace, cursorId); ASSERT_EQ(ErrorCodes::CursorNotFound, killResult); } @@ -352,12 +352,12 @@ TEST_F(ClusterCursorManagerTest, KillCursorWrongNamespace) { // Test that killing an unknown cursor returns an error with code ErrorCodes::CursorNotFound, // even if there is an existing cursor with the same namespace but a different cursor id. TEST_F(ClusterCursorManagerTest, KillCursorWrongCursorId) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); Status killResult = getManager()->killCursor(nss, cursorId + 1); ASSERT_EQ(ErrorCodes::CursorNotFound, killResult); } @@ -367,11 +367,11 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceBasic) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); getManager()->killMortalCursorsInactiveSince(getClockSource()->now()); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } @@ -382,11 +382,11 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipUnexpired) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); getManager()->killMortalCursorsInactiveSince(timeBeforeCursorCreation); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } @@ -395,32 +395,32 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceSkipImmortal) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Immortal)); getManager()->killMortalCursorsInactiveSince(getClockSource()->now()); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } // Test that killing all mortal expired cursors does not kill a mortal expired cursor that is // pinned. TEST_F(ClusterCursorManagerTest, ShouldNotKillPinnedCursors) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pin = assertGet(getManager()->checkOutCursor(nss, cursorId, nullptr)); getManager()->killMortalCursorsInactiveSince(getClockSource()->now()); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); pin.returnCursor(ClusterCursorManager::CursorState::NotExhausted); getManager()->killMortalCursorsInactiveSince(getClockSource()->now()); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } @@ -434,19 +434,18 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) if (i < numKilledCursorsExpected) { cutoff = getClockSource()->now(); } - ASSERT_OK( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + ASSERT_OK(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); getClockSource()->advance(Milliseconds(1)); } getManager()->killMortalCursorsInactiveSince(cutoff); for (size_t i = 0; i < numCursors; ++i) { ASSERT(!isMockCursorKilled(i)); } - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); for (size_t i = 0; i < numCursors; ++i) { if (i < numKilledCursorsExpected) { ASSERT(isMockCursorKilled(i)); @@ -460,18 +459,17 @@ TEST_F(ClusterCursorManagerTest, KillMortalCursorsInactiveSinceMultipleCursors) TEST_F(ClusterCursorManagerTest, KillAllCursors) { const size_t numCursors = 10; for (size_t i = 0; i < numCursors; ++i) { - ASSERT_OK( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + ASSERT_OK(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); } getManager()->killAllCursors(); for (size_t i = 0; i < numCursors; ++i) { ASSERT(!isMockCursorKilled(i)); } - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); for (size_t i = 0; i < numCursors; ++i) { ASSERT(isMockCursorKilled(i)); } @@ -480,30 +478,30 @@ TEST_F(ClusterCursorManagerTest, KillAllCursors) { // Test that reaping correctly calls kill() on the underlying ClusterClientCursor for a killed // cursor. TEST_F(ClusterCursorManagerTest, ReapZombieCursorsBasic) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } // Test that reaping does not call kill() on the underlying ClusterClientCursor for a killed cursor // that is still pinned. TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } @@ -513,17 +511,17 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipNonZombies) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } // Test that a new ClusterCursorManager's stats() is initially zero for the cursor counts. TEST_F(ClusterCursorManagerTest, StatsInitAsZero) { - ASSERT_EQ(0U, getManager()->stats().cursorsSharded); - ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded); + ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget); + ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); } @@ -532,9 +530,9 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterShardedCursor) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceSharded, + ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal)); - ASSERT_EQ(1U, getManager()->stats().cursorsSharded); + ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget); } // Test that registering a not-sharded cursor updates the corresponding counter in stats(). @@ -542,9 +540,9 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterNotShardedCursor) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); - ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded); + ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget); } // Test that checking out a cursor updates the pinned counter in stats(). @@ -553,7 +551,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) { assertGet(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceSharded, + ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); @@ -567,21 +565,20 @@ TEST_F(ClusterCursorManagerTest, StatsRegisterMultipleCursors) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceSharded, + ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal)); - ASSERT_EQ(i + 1, getManager()->stats().cursorsSharded); - ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded); + ASSERT_EQ(i + 1, getManager()->stats().cursorsMultiTarget); + ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget); } const size_t numNotShardedCursors = 10; for (size_t i = 0; i < numNotShardedCursors; ++i) { - ASSERT_OK( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); - ASSERT_EQ(numShardedCursors, getManager()->stats().cursorsSharded); - ASSERT_EQ(i + 1, getManager()->stats().cursorsNotSharded); + ASSERT_OK(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + ASSERT_EQ(numShardedCursors, getManager()->stats().cursorsMultiTarget); + ASSERT_EQ(i + 1, getManager()->stats().cursorsSingleTarget); } } @@ -591,24 +588,24 @@ TEST_F(ClusterCursorManagerTest, StatsKillShardedCursor) { assertGet(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceSharded, + ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal)); - ASSERT_EQ(1U, getManager()->stats().cursorsSharded); + ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget); ASSERT_OK(getManager()->killCursor(nss, cursorId)); - ASSERT_EQ(0U, getManager()->stats().cursorsSharded); + ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget); } // Test that killing a not-sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsKillNotShardedCursor) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); - ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget); ASSERT_OK(getManager()->killCursor(nss, cursorId)); - ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded); + ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget); } // Test that killing a pinned cursor decrements the corresponding counter in stats(). @@ -617,7 +614,7 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { assertGet(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceSharded, + ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); @@ -631,44 +628,44 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { assertGet(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceSharded, + ClusterCursorManager::CursorType::MultiTarget, ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); - ASSERT_EQ(1U, getManager()->stats().cursorsSharded); + ASSERT_OK(pinnedCursor.getValue().next().getStatus()); + ASSERT_EQ(1U, getManager()->stats().cursorsMultiTarget); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); - ASSERT_EQ(0U, getManager()->stats().cursorsSharded); + ASSERT_EQ(0U, getManager()->stats().cursorsMultiTarget); } // Test that exhausting a not-sharded cursor decrements the corresponding counter in stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); - ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded); + ASSERT_OK(pinnedCursor.getValue().next().getStatus()); + ASSERT_EQ(1U, getManager()->stats().cursorsSingleTarget); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); - ASSERT_EQ(0U, getManager()->stats().cursorsNotSharded); + ASSERT_EQ(0U, getManager()->stats().cursorsSingleTarget); } // Test that checking a pinned cursor in as exhausted decrements the corresponding counter in // stats(). TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); + ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -677,15 +674,15 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { // Test that checking a pinned cursor in as *not* exhausted decrements the corresponding counter in // stats(). TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_OK(pinnedCursor.getValue().next(nullptr).getStatus()); + ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -693,12 +690,12 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { // Test that getting the namespace for a cursor returns the correct namespace. TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdBasic) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); boost::optional<NamespaceString> cursorNamespace = getManager()->getNamespaceForCursorId(cursorId); ASSERT(cursorNamespace); @@ -711,12 +708,12 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsSameNames const size_t numCursors = 10; std::vector<CursorId> cursorIds(numCursors); for (size_t i = 0; i < numCursors; ++i) { - cursorIds[i] = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + cursorIds[i] = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); } for (size_t i = 0; i < numCursors; ++i) { boost::optional<NamespaceString> cursorNamespace = @@ -733,12 +730,12 @@ TEST_F(ClusterCursorManagerTest, GetNamespaceForCursorIdMultipleCursorsDifferent std::vector<std::pair<NamespaceString, CursorId>> cursors(numCursors); for (size_t i = 0; i < numCursors; ++i) { NamespaceString cursorNamespace(std::string(str::stream() << "test.collection" << i)); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - cursorNamespace, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + cursorNamespace, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); cursors[i] = {cursorNamespace, cursorId}; } for (size_t i = 0; i < numCursors; ++i) { @@ -764,12 +761,12 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDefaultConstructor) { // Test that returning a pinned cursor correctly unpins the cursor, and leaves the pin owning no // cursor. TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); @@ -783,17 +780,17 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { // Test that returning a pinned cursor with 'Exhausted' correctly de-registers and destroys the // cursor, and leaves the pin owning no cursor. TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); - ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus()); + ASSERT_OK(registeredCursor.getValue().next().getStatus()); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); @@ -802,7 +799,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { // reapZombieCursors() and check that the cursor still has not been killed. ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); } @@ -815,56 +812,56 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust // The mock should indicate that is has open remote cursors. mockCursor->markRemotesNotExhausted(); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - std::move(mockCursor), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + std::move(mockCursor), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); - ASSERT_OK(registeredCursor.getValue().next(nullptr).getStatus()); + ASSERT_OK(registeredCursor.getValue().next().getStatus()); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::Exhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); // Cursor should be kill pending, so it will be killed during reaping. ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } // Test that the PinnedCursor move assignment operator correctly kills the cursor if it has not yet // been returned. TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); pinnedCursor = ClusterCursorManager::PinnedCursor(); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } // Test that the PinnedCursor destructor correctly kills the cursor if it has not yet been returned. TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) { { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); } ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } @@ -874,12 +871,12 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { mockCursor->markRemotesNotExhausted(); ASSERT_FALSE(mockCursor->remotesExhausted()); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - std::move(mockCursor), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + std::move(mockCursor), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted()); @@ -887,25 +884,25 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { // Test that killed cursors which are still pinned are not reaped. TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT(!isMockCursorKilled(0)); // Pinned cursor should remain alive after reaping. - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(!isMockCursorKilled(0)); // The cursor can be reaped once it is returned to the manager. pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT(!isMockCursorKilled(0)); - getManager()->reapZombieCursors(); + getManager()->reapZombieCursors(nullptr); ASSERT(isMockCursorKilled(0)); } @@ -913,33 +910,32 @@ TEST_F(ClusterCursorManagerTest, CannotRegisterCursorDuringShutdown) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT(!isMockCursorKilled(0)); - getManager()->shutdown(); + getManager()->shutdown(nullptr); ASSERT(isMockCursorKilled(0)); - ASSERT_EQUALS( - ErrorCodes::ShutdownInProgress, - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, + getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); } TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT(!isMockCursorKilled(0)); - getManager()->shutdown(); + getManager()->shutdown(nullptr); ASSERT(isMockCursorKilled(0)); @@ -955,7 +951,7 @@ TEST_F(ClusterCursorManagerTest, CursorsWithoutSessions) { ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); // Manager should have no active sessions. @@ -970,12 +966,12 @@ TEST_F(ClusterCursorManagerTest, CursorsWithoutSessions) { TEST_F(ClusterCursorManagerTest, OneCursorWithASession) { // Add a cursor with a session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); // Retrieve all sessions active in manager - set should contain just lsid. LogicalSessionIdSet lsids; @@ -1004,12 +1000,12 @@ TEST_F(ClusterCursorManagerTest, OneCursorWithASession) { TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) { // Add a cursor with a session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); - auto cursorId = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); // Check the cursor out, then try to append cursors, see that we get one. auto res = getManager()->checkOutCursor(nss, cursorId, nullptr); @@ -1025,18 +1021,18 @@ TEST_F(ClusterCursorManagerTest, GetSessionIdsWhileCheckedOut) { TEST_F(ClusterCursorManagerTest, MultipleCursorsWithSameSession) { // Add two cursors on the same session to the cursor manager. auto lsid = makeLogicalSessionIdForTest(); - auto cursorId1 = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); - auto cursorId2 = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId1 = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + auto cursorId2 = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); // Retrieve all sessions - set should contain just lsid. stdx::unordered_set<LogicalSessionId, LogicalSessionIdHash> lsids; @@ -1073,24 +1069,24 @@ TEST_F(ClusterCursorManagerTest, MultipleCursorsMultipleSessions) { auto lsid2 = makeLogicalSessionIdForTest(); // Register two cursors with different lsids, and one without. - CursorId cursor1 = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid1), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); - - CursorId cursor2 = assertGet( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid2), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + CursorId cursor1 = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid1), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + + CursorId cursor2 = + assertGet(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid2), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_OK(getManager()->registerCursor(nullptr, allocateMockCursor(), nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal)); // Retrieve all sessions - should be both lsids. @@ -1117,12 +1113,11 @@ TEST_F(ClusterCursorManagerTest, ManyCursorsManySessions) { const int count = 10000; for (int i = 0; i < count; i++) { auto lsid = makeLogicalSessionIdForTest(); - ASSERT_OK( - getManager()->registerCursor(nullptr, - allocateMockCursor(lsid), - nss, - ClusterCursorManager::CursorType::NamespaceNotSharded, - ClusterCursorManager::CursorLifetime::Mortal)); + ASSERT_OK(getManager()->registerCursor(nullptr, + allocateMockCursor(lsid), + nss, + ClusterCursorManager::CursorType::SingleTarget, + ClusterCursorManager::CursorLifetime::Mortal)); } // Retrieve all sessions. diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 0f7630d5dc8..b7a5ecc6b34 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -257,6 +257,12 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, return swCursors.getStatus(); } + // Determine whether the cursor we may eventually register will be single- or multi-target. + + const auto cursorType = swCursors.getValue().size() > 1 + ? ClusterCursorManager::CursorType::MultiTarget + : ClusterCursorManager::CursorType::SingleTarget; + // Transfer the established cursors to a ClusterClientCursor. params.remotes = std::move(swCursors.getValue()); @@ -267,8 +273,11 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; + + ccc->reattachToOperationContext(opCtx); + while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { - auto next = ccc->next(opCtx); + auto next = ccc->next(); if (!next.isOK()) { return next.getStatus(); @@ -300,6 +309,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, results->push_back(std::move(nextObj)); } + ccc->detachFromOperationContext(); + if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } @@ -313,8 +324,6 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, // Register the cursor with the cursor manager for subsequent getMore's. auto cursorManager = Grid::get(opCtx)->getCursorManager(); - const auto cursorType = chunkManager ? ClusterCursorManager::CursorType::NamespaceSharded - : ClusterCursorManager::CursorType::NamespaceNotSharded; const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout() ? ClusterCursorManager::CursorLifetime::Immortal : ClusterCursorManager::CursorLifetime::Mortal; @@ -427,8 +436,11 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, long long batchSize = request.batchSize.value_or(0); long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + + pinnedCursor.getValue().reattachToOperationContext(opCtx); + while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { - auto next = pinnedCursor.getValue().next(opCtx); + auto next = pinnedCursor.getValue().next(); if (!next.isOK()) { return next.getStatus(); } @@ -454,6 +466,8 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, batch.push_back(std::move(*next.getValue().getResult())); } + pinnedCursor.getValue().detachFromOperationContext(); + // Transfer ownership of the cursor back to the cursor manager. pinnedCursor.getValue().returnCursor(cursorState); diff --git a/src/mongo/s/query/cluster_query_knobs.cpp b/src/mongo/s/query/cluster_query_knobs.cpp index 76e82b4f914..79ef4737760 100644 --- a/src/mongo/s/query/cluster_query_knobs.cpp +++ b/src/mongo/s/query/cluster_query_knobs.cpp @@ -35,5 +35,6 @@ namespace mongo { MONGO_EXPORT_SERVER_PARAMETER(internalQueryAlwaysMergeOnPrimaryShard, bool, false); +MONGO_EXPORT_SERVER_PARAMETER(internalQueryProhibitMergingOnMongoS, bool, false); } // namespace mongo diff --git a/src/mongo/s/query/cluster_query_knobs.h b/src/mongo/s/query/cluster_query_knobs.h index 6eaf31dd102..d75670822a7 100644 --- a/src/mongo/s/query/cluster_query_knobs.h +++ b/src/mongo/s/query/cluster_query_knobs.h @@ -34,7 +34,15 @@ namespace mongo { // If set to true on mongos, all aggregations delivered to the mongos which require a merging shard // will select the primary shard as the merger. False by default, which means that the merging shard -// will be selected randomly amongst the shards participating in the query. +// will be selected randomly amongst the shards participating in the query. Pipelines capable of +// merging on mongoS are unaffected by this setting, unless internalQueryProhibitMergingOnMongoS is +// true. extern AtomicBool internalQueryAlwaysMergeOnPrimaryShard; +// If set to true on mongos, all aggregations which could otherwise merge on the mongos will be +// obliged to merge on a shard instead. Pipelines which are redirected to the shards will obey the +// value of internalQueryAlwaysMergeOnPrimaryShard. False by default, meaning that pipelines capable +// of merging on mongoS will always do so. +extern AtomicBool internalQueryProhibitMergingOnMongoS; + } // namespace mongo diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index f6128a53e43..1f2fd2a9e7f 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -66,7 +66,7 @@ public: * holding on to a subset of the returned results and need to minimize memory usage, call copy() * on the BSONObjs. */ - virtual StatusWith<ClusterQueryResult> next(OperationContext* opCtx) = 0; + virtual StatusWith<ClusterQueryResult> next() = 0; /** * Must be called before destruction to abandon a not-yet-exhausted plan. May block waiting for @@ -88,16 +88,64 @@ public: */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + /** + * Sets the current operation context to be used by the router stage. + */ + void reattachToOperationContext(OperationContext* opCtx) { + invariant(!_opCtx); + _opCtx = opCtx; + + if (_child) { + _child->reattachToOperationContext(opCtx); + } + + doReattachToOperationContext(); + } + + /** + * Discards the stage's current OperationContext, setting it to 'nullptr'. + */ + void detachFromOperationContext() { + invariant(_opCtx); + _opCtx = nullptr; + + if (_child) { + _child->detachFromOperationContext(); + } + + doDetachFromOperationContext(); + } + protected: /** + * Performs any stage-specific reattach actions. Called after the OperationContext has been set + * and is available via getOpCtx(). + */ + virtual void doReattachToOperationContext() {} + + /** + * Performs any stage-specific detach actions. Called after the OperationContext has been set to + * nullptr. + */ + virtual void doDetachFromOperationContext() {} + + /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. */ RouterExecStage* getChildStage() { return _child.get(); } + /** + * Returns a pointer to the current OperationContext, or nullptr if there is no context. + */ + OperationContext* getOpCtx() { + return _opCtx; + } + private: std::unique_ptr<RouterExecStage> _child; + OperationContext* _opCtx = nullptr; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp new file mode 100644 index 00000000000..6fdfdc4fea2 --- /dev/null +++ b/src/mongo/s/query/router_stage_aggregation_merge.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/router_stage_aggregation_merge.h" + +#include "mongo/db/pipeline/document_source_merge_cursors.h" +#include "mongo/db/pipeline/expression_context.h" + +namespace mongo { + +RouterStageAggregationMerge::RouterStageAggregationMerge( + std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline) + : _mergePipeline(std::move(mergePipeline)) {} + +StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() { + // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF. + if (auto result = _mergePipeline->getNext()) { + return {result->toBson()}; + } + + // If we reach this point, we have hit EOF. + _mergePipeline.get_deleter().dismissDisposal(); + _mergePipeline->dispose(getOpCtx()); + + return {ClusterQueryResult()}; +} + +void RouterStageAggregationMerge::doReattachToOperationContext() { + _mergePipeline->reattachToOperationContext(getOpCtx()); +} + +void RouterStageAggregationMerge::doDetachFromOperationContext() { + _mergePipeline->detachFromOperationContext(); +} + +void RouterStageAggregationMerge::kill(OperationContext* opCtx) { + _mergePipeline.get_deleter().dismissDisposal(); + _mergePipeline->dispose(opCtx); +} + +bool RouterStageAggregationMerge::remotesExhausted() { + const auto mergeSource = + static_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get()); + return mergeSource->remotesExhausted(); +} + +Status RouterStageAggregationMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"}; +} + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_aggregation_merge.h b/src/mongo/s/query/router_stage_aggregation_merge.h new file mode 100644 index 00000000000..363b46e73d9 --- /dev/null +++ b/src/mongo/s/query/router_stage_aggregation_merge.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/router_exec_stage.h" + +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { + +/** + * Draws results from a Pipeline with a DocumentSourceMergeCursors at its head, which is the + * underlying source of the stream of merged documents manipulated by the RouterStage pipeline. + */ +class RouterStageAggregationMerge final : public RouterExecStage { +public: + RouterStageAggregationMerge(std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline); + + StatusWith<ClusterQueryResult> next() final; + + void kill(OperationContext* opCtx) final; + + bool remotesExhausted() final; + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + +protected: + void doReattachToOperationContext() final; + + void doDetachFromOperationContext() final; + +private: + std::unique_ptr<Pipeline, Pipeline::Deleter> _mergePipeline; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index ea90251eef6..03711279e07 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -39,12 +39,12 @@ RouterStageLimit::RouterStageLimit(std::unique_ptr<RouterExecStage> child, long invariant(limit > 0); } -StatusWith<ClusterQueryResult> RouterStageLimit::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> RouterStageLimit::next() { if (_returnedSoFar >= _limit) { return {ClusterQueryResult()}; } - auto childResult = getChildStage()->next(opCtx); + auto childResult = getChildStage()->next(); if (!childResult.isOK()) { return childResult; } diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 42223902cc1..cb2fd708835 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -39,7 +39,7 @@ class RouterStageLimit final : public RouterExecStage { public: RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit); - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp index f866249cdd1..61689e4cd6a 100644 --- a/src/mongo/s/query/router_stage_limit_test.cpp +++ b/src/mongo/s/query/router_stage_limit_test.cpp @@ -40,9 +40,10 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, -// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use -// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. +// Note: Though the next() method on RouterExecStage and its subclasses depend on an +// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are +// mocked in this test using RouterStageMock. RouterStageMock does not actually use the +// OperationContext, so we omit the call to rettachToOperationContext in these tests. TEST(RouterStageLimitTest, LimitIsOne) { auto mockStage = stdx::make_unique<RouterStageMock>(); @@ -52,17 +53,17 @@ TEST(RouterStageLimitTest, LimitIsOne) { auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 1); - auto firstResult = limitStage->next(nullptr); + auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(nullptr); + auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(!secondResult.getValue().getResult()); // Once end-of-stream is reached, the limit stage should keep returning no results. - auto thirdResult = limitStage->next(nullptr); + auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(!thirdResult.getValue().getResult()); } @@ -75,17 +76,17 @@ TEST(RouterStageLimitTest, LimitIsTwo) { auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2); - auto firstResult = limitStage->next(nullptr); + auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(nullptr); + auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); - auto thirdResult = limitStage->next(nullptr); + auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(!thirdResult.getValue().getResult()); } @@ -99,12 +100,12 @@ TEST(RouterStageLimitTest, LimitStagePropagatesError) { auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 3); - auto firstResult = limitStage->next(nullptr); + auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(nullptr); + auto secondResult = limitStage->next(); ASSERT_NOT_OK(secondResult.getStatus()); ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); @@ -122,21 +123,21 @@ TEST(RouterStageLimitTest, LimitStageToleratesMidStreamEOF) { auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 2); - auto firstResult = limitStage->next(nullptr); + auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); - auto secondResult = limitStage->next(nullptr); + auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().isEOF()); - auto thirdResult = limitStage->next(nullptr); + auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2)); - auto fourthResult = limitStage->next(nullptr); + auto fourthResult = limitStage->next(); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().isEOF()); } @@ -150,19 +151,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100); ASSERT_TRUE(limitStage->remotesExhausted()); - auto firstResult = limitStage->next(nullptr); + auto firstResult = limitStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1)); ASSERT_TRUE(limitStage->remotesExhausted()); - auto secondResult = limitStage->next(nullptr); + auto secondResult = limitStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(limitStage->remotesExhausted()); - auto thirdResult = limitStage->next(nullptr); + auto thirdResult = limitStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(limitStage->remotesExhausted()); diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 90a80e7161b..72fe7a06624 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -40,9 +40,9 @@ RouterStageMerge::RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams* params) : _executor(executor), _arm(executor, params) {} -StatusWith<ClusterQueryResult> RouterStageMerge::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> RouterStageMerge::next() { while (!_arm.ready()) { - auto nextEventStatus = _arm.nextEvent(opCtx); + auto nextEventStatus = _arm.nextEvent(getOpCtx()); if (!nextEventStatus.isOK()) { return nextEventStatus.getStatus(); } diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 428a405b401..caae43877c6 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -45,7 +45,7 @@ class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(executor::TaskExecutor* executor, ClusterClientCursorParams* params); - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index e134340713a..edeb1f9945c 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -50,7 +50,7 @@ void RouterStageMock::markRemotesExhausted() { _remotesExhausted = true; } -StatusWith<ClusterQueryResult> RouterStageMock::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> RouterStageMock::next() { if (_resultsQueue.empty()) { return {ClusterQueryResult()}; } diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index 7cba32a81f6..18baaeacd74 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -44,7 +44,7 @@ class RouterStageMock final : public RouterExecStage { public: ~RouterStageMock() final {} - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 9cb1e4d26c9..fecf5440898 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -41,8 +41,8 @@ namespace mongo { RouterStageRemoveSortKey::RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child) : RouterExecStage(std::move(child)) {} -StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next(OperationContext* opCtx) { - auto childResult = getChildStage()->next(opCtx); +StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() { + auto childResult = getChildStage()->next(); if (!childResult.isOK() || !childResult.getValue().getResult()) { return childResult; } diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index e3599a3e9b0..c2329bbc93d 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -41,7 +41,7 @@ class RouterStageRemoveSortKey final : public RouterExecStage { public: RouterStageRemoveSortKey(std::unique_ptr<RouterExecStage> child); - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp index e9f338b9e5f..5db61b7b0a9 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp @@ -40,9 +40,10 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, -// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use -// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. +// Note: Though the next() method on RouterExecStage and its subclasses depend on an +// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are +// mocked in this test using RouterStageMock. RouterStageMock does not actually use the +// OperationContext, so we omit the call to rettachToOperationContext in these tests. TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { auto mockStage = stdx::make_unique<RouterStageMock>(); @@ -54,29 +55,29 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) { auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); - auto firstResult = sortKeyStage->next(nullptr); + auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4 << "b" << 3)); - auto secondResult = sortKeyStage->next(nullptr); + auto secondResult = sortKeyStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("c" << BSON("d" << "foo"))); - auto thirdResult = sortKeyStage->next(nullptr); + auto thirdResult = sortKeyStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 3)); - auto fourthResult = sortKeyStage->next(nullptr); + auto fourthResult = sortKeyStage->next(); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj()); - auto fifthResult = sortKeyStage->next(nullptr); + auto fifthResult = sortKeyStage->next(); ASSERT_OK(fifthResult.getStatus()); ASSERT(fifthResult.getValue().isEOF()); } @@ -88,12 +89,12 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) { auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); - auto firstResult = sortKeyStage->next(nullptr); + auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSONObj()); - auto secondResult = sortKeyStage->next(nullptr); + auto secondResult = sortKeyStage->next(); ASSERT_NOT_OK(secondResult.getStatus()); ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); @@ -107,21 +108,21 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) { auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); - auto firstResult = sortKeyStage->next(nullptr); + auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); - auto secondResult = sortKeyStage->next(nullptr); + auto secondResult = sortKeyStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().isEOF()); - auto thirdResult = sortKeyStage->next(nullptr); + auto thirdResult = sortKeyStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*thirdResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); - auto fourthResult = sortKeyStage->next(nullptr); + auto fourthResult = sortKeyStage->next(); ASSERT_OK(fourthResult.getStatus()); ASSERT(fourthResult.getValue().isEOF()); } @@ -135,19 +136,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); - auto firstResult = sortKeyStage->next(nullptr); + auto firstResult = sortKeyStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1 << "b" << 1)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); - auto secondResult = sortKeyStage->next(nullptr); + auto secondResult = sortKeyStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 2 << "b" << 2)); ASSERT_TRUE(sortKeyStage->remotesExhausted()); - auto thirdResult = sortKeyStage->next(nullptr); + auto thirdResult = sortKeyStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(sortKeyStage->remotesExhausted()); diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 6763ca5808b..7510e3aefd6 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -39,9 +39,9 @@ RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long lo invariant(skip > 0); } -StatusWith<ClusterQueryResult> RouterStageSkip::next(OperationContext* opCtx) { +StatusWith<ClusterQueryResult> RouterStageSkip::next() { while (_skippedSoFar < _skip) { - auto next = getChildStage()->next(opCtx); + auto next = getChildStage()->next(); if (!next.isOK()) { return next; } @@ -53,7 +53,7 @@ StatusWith<ClusterQueryResult> RouterStageSkip::next(OperationContext* opCtx) { ++_skippedSoFar; } - return getChildStage()->next(opCtx); + return getChildStage()->next(); } void RouterStageSkip::kill(OperationContext* opCtx) { diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 773220d4fe6..c6dc1adda39 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -39,7 +39,7 @@ class RouterStageSkip final : public RouterExecStage { public: RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip); - StatusWith<ClusterQueryResult> next(OperationContext* opCtx) final; + StatusWith<ClusterQueryResult> next() final; void kill(OperationContext* opCtx) final; diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp index 79099661a52..f1a58371b5c 100644 --- a/src/mongo/s/query/router_stage_skip_test.cpp +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -40,9 +40,10 @@ namespace mongo { namespace { -// Note: Though the next() method on RouterExecStage and its subclasses takes an OperationContext*, -// these stages are mocked in this test using RouterStageMock. RouterStageMock does not actually use -// the OperationContext, so we pass a nullptr OperationContext* to next() in these tests. +// Note: Though the next() method on RouterExecStage and its subclasses depend on an +// OperationContext* provided via a preceding call to reattachToOperationContext(), these stages are +// mocked in this test using RouterStageMock. RouterStageMock does not actually use the +// OperationContext, so we omit the call to rettachToOperationContext in these tests. TEST(RouterStageSkipTest, SkipIsOne) { auto mockStage = stdx::make_unique<RouterStageMock>(); @@ -52,22 +53,22 @@ TEST(RouterStageSkipTest, SkipIsOne) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2)); - auto secondResult = skipStage->next(nullptr); + auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); // Once end-of-stream is reached, the skip stage should keep returning boost::none. - auto thirdResult = skipStage->next(nullptr); + auto thirdResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); - auto fourthResult = skipStage->next(nullptr); + auto fourthResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); } @@ -81,12 +82,12 @@ TEST(RouterStageSkipTest, SkipIsThree) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 4)); - auto secondResult = skipStage->next(nullptr); + auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().isEOF()); } @@ -99,7 +100,7 @@ TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().isEOF()); } @@ -112,7 +113,7 @@ TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 100); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().isEOF()); } @@ -126,7 +127,7 @@ TEST(RouterStageSkipTest, ErrorWhileSkippingResults) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_NOT_OK(firstResult.getStatus()); ASSERT_EQ(firstResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(firstResult.getStatus().reason(), "bad thing happened"); @@ -141,12 +142,12 @@ TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 3)); - auto secondResult = skipStage->next(nullptr); + auto secondResult = skipStage->next(); ASSERT_NOT_OK(secondResult.getStatus()); ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); @@ -162,16 +163,16 @@ TEST(RouterStageSkipTest, SkipStageToleratesMidStreamEOF) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().isEOF()); - auto secondResult = skipStage->next(nullptr); + auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); - auto thirdResult = skipStage->next(nullptr); + auto thirdResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); } @@ -186,19 +187,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1); ASSERT_TRUE(skipStage->remotesExhausted()); - auto firstResult = skipStage->next(nullptr); + auto firstResult = skipStage->next(); ASSERT_OK(firstResult.getStatus()); ASSERT(firstResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 2)); ASSERT_TRUE(skipStage->remotesExhausted()); - auto secondResult = skipStage->next(nullptr); + auto secondResult = skipStage->next(); ASSERT_OK(secondResult.getStatus()); ASSERT(secondResult.getValue().getResult()); ASSERT_BSONOBJ_EQ(*secondResult.getValue().getResult(), BSON("a" << 3)); ASSERT_TRUE(skipStage->remotesExhausted()); - auto thirdResult = skipStage->next(nullptr); + auto thirdResult = skipStage->next(); ASSERT_OK(thirdResult.getStatus()); ASSERT(thirdResult.getValue().isEOF()); ASSERT_TRUE(skipStage->remotesExhausted()); diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 1fd5b146741..a2f912971d4 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -74,7 +74,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, cursorManager->registerCursor(opCtx, ccc.releaseCursor(), requestedNss, - ClusterCursorManager::CursorType::NamespaceNotSharded, + ClusterCursorManager::CursorType::SingleTarget, ClusterCursorManager::CursorLifetime::Mortal); if (!clusterCursorId.isOK()) { return clusterCursorId.getStatus(); diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 760e313c5d0..521702e56b8 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -206,7 +206,7 @@ static void cleanupTask() { } if (auto cursorManager = Grid::get(opCtx)->getCursorManager()) { - cursorManager->shutdown(); + cursorManager->shutdown(opCtx); } if (auto pool = Grid::get(opCtx)->getExecutorPool()) { |