diff options
author | galon1 <gil.alon@mongodb.com> | 2022-10-14 16:20:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-14 17:13:05 +0000 |
commit | 199173d18b9af33156290967dc50d1af3e0c34b3 (patch) | |
tree | 0e0b932e0ad0d56c2df1598e6fbbf1b11e1f5f27 | |
parent | cf440fb8650821337a7e3228e8ba4ee7b537e0d3 (diff) | |
download | mongo-199173d18b9af33156290967dc50d1af3e0c34b3.tar.gz |
SERVER-63811 Add check so documents stage runs when db does not exist
-rw-r--r-- | jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js | 2 | ||||
-rw-r--r-- | jstests/sharding/documents_db_not_exist.js | 48 | ||||
-rw-r--r-- | jstests/sharding/documents_sharded.js | 160 | ||||
-rw-r--r-- | src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_documents.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_documents.h | 24 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_document_source.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.h | 3 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_map_reduce_agg.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.h | 4 |
14 files changed, 299 insertions, 29 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js b/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js index 8301f78bbfc..3ec0290054f 100644 --- a/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js +++ b/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js @@ -9,8 +9,6 @@ (function() { "use strict"; -// TODO SERVER-63811 Ensure the database exists so we get back non-empty results even in a sharded -// cluster. assert.commandWorked(db[jsTestName()].insert({dummy: 1})); let windowResults = db.aggregate([ diff --git a/jstests/sharding/documents_db_not_exist.js b/jstests/sharding/documents_db_not_exist.js new file mode 100644 index 00000000000..1742d149516 --- /dev/null +++ b/jstests/sharding/documents_db_not_exist.js @@ -0,0 +1,48 @@ +/** + * Tests that $documents stage continues even when the database does not exist + * @tags: [requires_fcv_62, multiversion_incompatible] + * + */ + +(function() { +"use strict"; + +let st = new ShardingTest({shards: 3}); + +function listDatabases(options) { + return assert.commandWorked(st.s.adminCommand(Object.assign({listDatabases: 1}, options))) + .databases; +} + +function createAndDropDatabase(dbName) { + // Create the database. + let db = st.s.getDB(dbName); + assert.commandWorked(db.foo.insert({})); + // Confirms the database exists. + assert.eq(1, listDatabases({nameOnly: true, filter: {name: dbName}}).length); + // Drop the database + assert.commandWorked(db.dropDatabase()); + // Confirm the database is dropped. + assert.eq(0, listDatabases({nameOnly: true, filter: {name: dbName}}).length); + return db; +} + +// $documents stage evaluates to an array of objects. +let db = createAndDropDatabase("test"); +let documents = []; +for (let i = 0; i < 50; i++) { + documents.push({_id: i}); +} +let result = db.aggregate([{$documents: documents}]); +assert(result.toArray().length == 50); + +//$documents stage evaluates to an array of objects in a pipeline +db = createAndDropDatabase("test2"); +result = db.aggregate([ + {$documents: [{_id: 1, size: "medium"}, {_id: 2, size: "large"}]}, + {$match: {size: "medium"}} +]); +assert(result.toArray().length == 1); + +st.stop(); +})(); diff --git a/jstests/sharding/documents_sharded.js b/jstests/sharding/documents_sharded.js new file mode 100644 index 00000000000..85b20cd29c3 --- /dev/null +++ b/jstests/sharding/documents_sharded.js @@ -0,0 +1,160 @@ +/** + * This is the test for $documents stage in aggregation pipeline on a sharded collection. + * @tags: [ do_not_wrap_aggregations_in_facets, requires_fcv_51 ] + * + */ + +(function() { +"use strict"; + +load("jstests/aggregation/extras/utils.js"); // For resultsEq. + +let st = new ShardingTest({shards: 2}); +const db = st.s.getDB(jsTestName()); +const dbName = db.getName(); +assert.commandWorked(db.adminCommand({enableSharding: dbName})); + +// Create sharded collections. +const coll = db['shardedColl']; +st.shardColl(coll, {x: 1}, {x: 1}, {x: 1}, dbName); + +const lookup_coll = db['lookupColl']; +st.shardColl(lookup_coll, {id_name: 1}, {id_name: 1}, {id_name: 1}, dbName); +for (let i = 0; i < 10; i++) { + assert.commandWorked(lookup_coll.insert({id_name: i, name: "name_" + i})); +} + +// $documents given an array of objects. +const docs = db.aggregate([{$documents: [{a1: 1}, {a1: 2}]}]).toArray(); + +assert.eq(2, docs.length); +assert.eq(docs[0], {a1: 1}); +assert.eq(docs[1], {a1: 2}); + +// $documents evaluates to an array of objects. +const docs1 = + db.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]).toArray(); + +assert.eq(100, docs1.length); +for (let i = 0; i < 100; i++) { + assert.eq(docs1[i], {x: i}); +} + +// $documents evaluates to an array of objects. +const docsUnionWith = + coll.aggregate([ + { + $unionWith: { + pipeline: [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}] + } + }, + {$group: {_id: "$x", x: {$first: "$x"}}}, + {$project: {_id: 0}}, + ]) + .toArray(); +assert(resultsEq([{x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith)); + +{ // $documents with const objects inside $unionWith. + const res = coll.aggregate([ + {$unionWith: {pipeline: [{$documents: [{x: 1}, {x: 2}]}]}}, + {$group: {_id: "$x", x: {$first: "$x"}}}, + {$project: {_id: 0}} + ]) + .toArray(); + assert(resultsEq([{x: 1}, {x: 2}], res)); +} + +{ // $documents with const objects inside $lookup (no "coll", explicit $match). + const res = lookup_coll.aggregate([ + { + $lookup: { + let: {"id_lookup": "$id_name"}, + pipeline: [ + {$documents: [{xx: 1}, {xx: 2}, {xx : 3}]}, + { + $match: + { + $expr: + { + $eq: + ["$$id_lookup", "$xx"] + } + } + } + ], + as: "names" + } + }, + {$match: {"names": {"$ne": []}}}, + {$project: {_id: 0}} + ] + ) + .toArray(); + assert(resultsEq( + [ + {id_name: 1, name: "name_1", names: [{"xx": 1}]}, + {id_name: 2, name: "name_2", names: [{"xx": 2}]}, + {id_name: 3, name: "name_3", names: [{"xx": 3}]} + ], + res)); +} +{ // $documents with const objects inside $lookup (no "coll", + localField/foreignField). + const res = lookup_coll.aggregate([ + { + $lookup: { + localField: "id_name", + foreignField: "xx", + pipeline: [ + {$documents: [{xx: 1}, {xx: 2}, {xx: 3}]} + ], + as: "names" + } + }, + {$match: {"names": {"$ne": []}}}, + {$project: {_id: 0}} + ]) + .toArray(); + assert(resultsEq( + [ + {id_name: 1, name: "name_1", names: [{"xx": 1}]}, + {id_name: 2, name: "name_2", names: [{"xx": 2}]}, + {id_name: 3, name: "name_3", names: [{"xx": 3}]} + ], + res)); +} + +// Must fail when $document appears in the top level collection pipeline. +assert.throwsWithCode(() => { + coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]); +}, ErrorCodes.InvalidNamespace); + +// Must fail due to misplaced $document. +assert.throwsWithCode(() => { + coll.aggregate([{$project: {x: [{xx: 1}, {xx: 2}]}}, {$documents: [{x: 1}]}]); +}, 40602); + +// Test that $documents fails due to producing array of non-objects. +assert.throwsWithCode(() => { + db.aggregate([{$documents: [1, 2, 3]}]); +}, 40228); + +// Now with one object and one scalar. +assert.throwsWithCode(() => { + db.aggregate([{$documents: [{x: 1}, 2]}]); +}, 40228); + +// Test that $documents fails due when provided a non-array. +assert.throwsWithCode(() => { + db.aggregate([{$documents: "string"}]); +}, 5858203); + +// Test that $documents succeeds when given a singleton object. +assert.eq(db.aggregate([{$documents: [{x: [1, 2, 3]}]}]).toArray(), [{x: [1, 2, 3]}]); + +// Must fail when $document appears in the top level collection pipeline. +assert.throwsWithCode(() => { + coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]); +}, ErrorCodes.InvalidNamespace); + +st.stop(); +})();
\ No newline at end of file diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp index 069a7e2f0b2..8f04441de80 100644 --- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp +++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp @@ -53,10 +53,11 @@ TEST_F(DispatchShardPipelineTest, DoesNotSplitPipelineIfTargetingOneShard) { const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc( AggregateCommandRequest(expCtx()->ns, stages)); const bool hasChangeStream = false; + const bool startsWithDocuments = false; auto future = launchAsync([&] { auto results = sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, std::move(pipeline)); + serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline)); ASSERT_EQ(results.remoteCursors.size(), 1UL); ASSERT(!results.splitPipeline); }); @@ -84,10 +85,11 @@ TEST_F(DispatchShardPipelineTest, DoesSplitPipelineIfMatchSpansTwoShards) { const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc( AggregateCommandRequest(expCtx()->ns, stages)); const bool hasChangeStream = false; + const bool startsWithDocuments = false; auto future = launchAsync([&] { auto results = sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, std::move(pipeline)); + serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline)); ASSERT_EQ(results.remoteCursors.size(), 2UL); ASSERT(bool(results.splitPipeline)); }); @@ -118,10 +120,11 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineRetriesOnNetworkError) { const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc( AggregateCommandRequest(expCtx()->ns, stages)); const bool hasChangeStream = false; + const bool startsWithDocuments = false; auto future = launchAsync([&] { // Shouldn't throw. auto results = sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, std::move(pipeline)); + serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline)); ASSERT_EQ(results.remoteCursors.size(), 2UL); ASSERT(bool(results.splitPipeline)); }); @@ -163,11 +166,14 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfig const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc( AggregateCommandRequest(expCtx()->ns, stages)); const bool hasChangeStream = false; + const bool startsWithDocuments = false; + auto future = launchAsync([&] { - ASSERT_THROWS_CODE(sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, std::move(pipeline)), - AssertionException, - ErrorCodes::StaleConfig); + ASSERT_THROWS_CODE( + sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline)), + AssertionException, + ErrorCodes::StaleConfig); }); // Mock out an error response. @@ -197,15 +203,17 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) { const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc( AggregateCommandRequest(expCtx()->ns, stages)); const bool hasChangeStream = false; + const bool startsWithDocuments = false; auto future = launchAsync([&] { // Shouldn't throw. sharding::router::CollectionRouter router(getServiceContext(), kTestAggregateNss); - auto results = router.route(operationContext(), - "dispatch shard pipeline"_sd, - [&](OperationContext* opCtx, const ChunkManager& cm) { - return sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, pipeline->clone()); - }); + auto results = router.route( + operationContext(), + "dispatch shard pipeline"_sd, + [&](OperationContext* opCtx, const ChunkManager& cm) { + return sharded_agg_helpers::dispatchShardPipeline( + serializedCommand, hasChangeStream, startsWithDocuments, pipeline->clone()); + }); ASSERT_EQ(results.remoteCursors.size(), 1UL); ASSERT(!bool(results.splitPipeline)); }); diff --git a/src/mongo/db/pipeline/document_source_documents.cpp b/src/mongo/db/pipeline/document_source_documents.cpp index 5d4e02c71ea..934116a7c27 100644 --- a/src/mongo/db/pipeline/document_source_documents.cpp +++ b/src/mongo/db/pipeline/document_source_documents.cpp @@ -44,7 +44,7 @@ namespace mongo { using boost::intrusive_ptr; REGISTER_DOCUMENT_SOURCE(documents, - LiteParsedDocumentSourceDefault::parse, + DocumentSourceDocuments::LiteParsed::parse, DocumentSourceDocuments::createFromBson, AllowedWithApiStrict::kAlways); diff --git a/src/mongo/db/pipeline/document_source_documents.h b/src/mongo/db/pipeline/document_source_documents.h index a6ab70e0c59..6048ed551bb 100644 --- a/src/mongo/db/pipeline/document_source_documents.h +++ b/src/mongo/db/pipeline/document_source_documents.h @@ -35,6 +35,30 @@ namespace mongo { namespace DocumentSourceDocuments { +class LiteParsed : public LiteParsedDocumentSource { +public: + static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) { + return std::make_unique<LiteParsed>(spec.fieldName()); + } + + LiteParsed(std::string parseTimeName) : LiteParsedDocumentSource(std::move(parseTimeName)) {} + + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + return stdx::unordered_set<NamespaceString>(); + } + + PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const final { + return {}; + } + + bool isDocuments() const final { + return true; + } + + bool allowedToPassthroughFromMongos() const final { + return false; + } +}; static constexpr StringData kStageName = "$documents"_sd; diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index 34648576917..3e1dc6ea6c6 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -171,6 +171,13 @@ public: } /** + * Returns true if this is a $documents stage. + */ + virtual bool isDocuments() const { + return false; + } + + /** * Returns true if this stage does not require an input source. */ virtual bool isInitialSource() const { diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index 712d23c32bd..d02ff8ee70e 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -123,6 +123,13 @@ public: } /** + * Returns true if the pipeline begins with a $documents stage. + */ + bool startsWithDocuments() const { + return !_stageSpecs.empty() && _stageSpecs.front()->isDocuments(); + } + + /** * Returns true if the pipeline has a $changeStream stage. */ bool hasChangeStream() const { diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index fed03ec5079..7940fc0b97b 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -769,9 +769,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( LiteParsedPipeline liteParsedPipeline(aggRequest); auto hasChangeStream = liteParsedPipeline.hasChangeStream(); + auto startsWithDocuments = liteParsedPipeline.startsWithDocuments(); auto shardDispatchResults = dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, + startsWithDocuments, std::move(pipeline), shardTargetingPolicy, std::move(readConcern)); @@ -1007,6 +1009,7 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont DispatchShardPipelineResults dispatchShardPipeline( Document serializedCommand, bool hasChangeStream, + bool startsWithDocuments, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, ShardTargetingPolicy shardTargetingPolicy, boost::optional<BSONObj> readConcern) { @@ -1051,7 +1054,7 @@ DispatchShardPipelineResults dispatchShardPipeline( : expCtx->getCollatorBSON(); // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream); + const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream, startsWithDocuments); std::set<ShardId> shardIds = getTargetedShards( expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, shardTargetingCollation); @@ -1486,9 +1489,11 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) { AggregateCommandRequest aggRequest(expCtx->ns, rawStages); LiteParsedPipeline liteParsedPipeline(aggRequest); auto hasChangeStream = liteParsedPipeline.hasChangeStream(); + auto startsWithDocuments = liteParsedPipeline.startsWithDocuments(); auto shardDispatchResults = dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, + startsWithDocuments, std::move(pipeline)); BSONObjBuilder explainBuilder; auto appendStatus = @@ -1523,11 +1528,13 @@ Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) { return Shard::RetryPolicy::kIdempotent; } -bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) { +bool mustRunOnAllShards(const NamespaceString& nss, + bool hasChangeStream, + bool startsWithDocuments) { // The following aggregations must be routed to all shards: // - Any collectionless aggregation, such as non-localOps $currentOp. // - Any aggregation which begins with a $changeStream stage. - return nss.isCollectionlessAggregateNS() || hasChangeStream; + return !startsWithDocuments && (nss.isCollectionlessAggregateNS() || hasChangeStream); } std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 5f30f6f6a93..f69d422da0c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -124,6 +124,7 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) DispatchShardPipelineResults dispatchShardPipeline( Document serializedCommand, bool hasChangeStream, + bool startsWithDocuments, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, boost::optional<BSONObj> readConcern = boost::none); @@ -179,7 +180,7 @@ StatusWith<ChunkManager> getExecutionNsRoutingInfo(OperationContext* opCtx, /** * Returns true if an aggregation over 'nss' must run on all shards. */ -bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream); +bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream, bool startsWithDocuments); /** * Retrieves the desired retry policy based on whether the default writeConcern is set on 'opCtx'. diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp index 52a61ab9325..3591aa20b17 100644 --- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp @@ -187,6 +187,7 @@ bool runAggregationMapReduce(OperationContext* opCtx, cm, involvedNamespaces, false, // hasChangeStream + false, // startsWithDocuments true, // allowedToPassthrough false); // perShardCursor try { @@ -230,7 +231,8 @@ bool runAggregationMapReduce(OperationContext* opCtx, namespaces, privileges, &tempResults, - false)); // hasChangeStream + false, // hasChangeStream + false)); // startsWithDocuments break; } diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 7c09e31d1ef..6a0b135fef4 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -310,6 +310,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto hasChangeStream = liteParsedPipeline.hasChangeStream(); auto involvedNamespaces = liteParsedPipeline.getInvolvedNamespaces(); auto shouldDoFLERewrite = ::mongo::shouldDoFLERewrite(request); + auto startsWithDocuments = liteParsedPipeline.startsWithDocuments(); // If the routing table is not already taken by the higher level, fill it now. if (!cm) { @@ -339,7 +340,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, if (executionNsRoutingInfoStatus.isOK()) { cm = std::move(executionNsRoutingInfoStatus.getValue()); - } else if (!(hasChangeStream && + } else if (!((hasChangeStream || startsWithDocuments) && executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { appendEmptyResultSetWithStatus( opCtx, namespaces.requestedNss, executionNsRoutingInfoStatus.getStatus(), result); @@ -403,6 +404,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, cm, involvedNamespaces, hasChangeStream, + startsWithDocuments, allowedToPassthrough, request.getPassthroughToShard().has_value()); @@ -477,7 +479,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, namespaces, privileges, result, - hasChangeStream); + hasChangeStream, + startsWithDocuments); } case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: kSpecificShardOnly: { diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 3f970f2f8c1..5788b30a7d4 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -563,6 +563,7 @@ AggregationTargeter AggregationTargeter::make( boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, + bool startsWithDocuments, bool allowedToPassthrough, bool perShardCursor) { if (perShardCursor) { @@ -583,10 +584,11 @@ AggregationTargeter AggregationTargeter::make( // Determine whether this aggregation must be dispatched to all shards in the cluster. const bool mustRunOnAll = - sharded_agg_helpers::mustRunOnAllShards(executionNss, hasChangeStream); + sharded_agg_helpers::mustRunOnAllShards(executionNss, hasChangeStream, startsWithDocuments); - // If we don't have a routing table, then this is a $changeStream which must run on all shards. - invariant(cm || (mustRunOnAll && hasChangeStream)); + // If we don't have a routing table, then this is either a $changeStream which must run on all + // shards or a $documents stage which must not. + invariant(cm || (mustRunOnAll && hasChangeStream) || (startsWithDocuments && !mustRunOnAll)); // A pipeline is allowed to passthrough to the primary shard iff the following conditions are // met: @@ -663,11 +665,12 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, const ClusterAggregate::Namespaces& namespaces, const PrivilegeVector& privileges, BSONObjBuilder* result, - bool hasChangeStream) { + bool hasChangeStream, + bool startsWithDocuments) { auto expCtx = targeter.pipeline->getContext(); // If not, split the pipeline as necessary and dispatch to the relevant shards. auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline( - serializedCommand, hasChangeStream, std::move(targeter.pipeline)); + serializedCommand, hasChangeStream, startsWithDocuments, std::move(targeter.pipeline)); // If the operation is an explain, then we verify that it succeeded on all targeted // shards, write the results to the output builder, and return immediately. diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h index 046d8eab6ca..78c919192db 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.h +++ b/src/mongo/s/query/cluster_aggregation_planner.h @@ -82,6 +82,7 @@ struct AggregationTargeter { boost::optional<ChunkManager> cm, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, + bool startsWithDocuments, bool allowedToPassthrough, bool perShardCursor); @@ -125,7 +126,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, const ClusterAggregate::Namespaces& namespaces, const PrivilegeVector& privileges, BSONObjBuilder* result, - bool hasChangeStream); + bool hasChangeStream, + bool startsWithDocuments); /** * Similar to runPipelineOnPrimaryShard but allows $changeStreams. Intended for use by per shard |