summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgalon1 <gil.alon@mongodb.com>2022-10-14 16:20:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-14 17:13:05 +0000
commit199173d18b9af33156290967dc50d1af3e0c34b3 (patch)
tree0e0b932e0ad0d56c2df1598e6fbbf1b11e1f5f27
parentcf440fb8650821337a7e3228e8ba4ee7b537e0d3 (diff)
downloadmongo-199173d18b9af33156290967dc50d1af3e0c34b3.tar.gz
SERVER-63811 Add check so documents stage runs when db does not exist
-rw-r--r--jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js2
-rw-r--r--jstests/sharding/documents_db_not_exist.js48
-rw-r--r--jstests/sharding/documents_sharded.js160
-rw-r--r--src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source_documents.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_documents.h24
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h7
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h7
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp13
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h3
-rw-r--r--src/mongo/s/commands/cluster_map_reduce_agg.cpp4
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp7
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp13
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h4
14 files changed, 299 insertions, 29 deletions
diff --git a/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js b/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js
index 8301f78bbfc..3ec0290054f 100644
--- a/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js
+++ b/jstests/aggregation/sources/setWindowFields/output_overwrites_existing_data.js
@@ -9,8 +9,6 @@
(function() {
"use strict";
-// TODO SERVER-63811 Ensure the database exists so we get back non-empty results even in a sharded
-// cluster.
assert.commandWorked(db[jsTestName()].insert({dummy: 1}));
let windowResults = db.aggregate([
diff --git a/jstests/sharding/documents_db_not_exist.js b/jstests/sharding/documents_db_not_exist.js
new file mode 100644
index 00000000000..1742d149516
--- /dev/null
+++ b/jstests/sharding/documents_db_not_exist.js
@@ -0,0 +1,48 @@
+/**
+ * Tests that $documents stage continues even when the database does not exist
+ * @tags: [requires_fcv_62, multiversion_incompatible]
+ *
+ */
+
+(function() {
+"use strict";
+
+let st = new ShardingTest({shards: 3});
+
+function listDatabases(options) {
+ return assert.commandWorked(st.s.adminCommand(Object.assign({listDatabases: 1}, options)))
+ .databases;
+}
+
+function createAndDropDatabase(dbName) {
+ // Create the database.
+ let db = st.s.getDB(dbName);
+ assert.commandWorked(db.foo.insert({}));
+ // Confirms the database exists.
+ assert.eq(1, listDatabases({nameOnly: true, filter: {name: dbName}}).length);
+ // Drop the database
+ assert.commandWorked(db.dropDatabase());
+ // Confirm the database is dropped.
+ assert.eq(0, listDatabases({nameOnly: true, filter: {name: dbName}}).length);
+ return db;
+}
+
+// $documents stage evaluates to an array of objects.
+let db = createAndDropDatabase("test");
+let documents = [];
+for (let i = 0; i < 50; i++) {
+ documents.push({_id: i});
+}
+let result = db.aggregate([{$documents: documents}]);
+assert(result.toArray().length == 50);
+
+//$documents stage evaluates to an array of objects in a pipeline
+db = createAndDropDatabase("test2");
+result = db.aggregate([
+ {$documents: [{_id: 1, size: "medium"}, {_id: 2, size: "large"}]},
+ {$match: {size: "medium"}}
+]);
+assert(result.toArray().length == 1);
+
+st.stop();
+})();
diff --git a/jstests/sharding/documents_sharded.js b/jstests/sharding/documents_sharded.js
new file mode 100644
index 00000000000..85b20cd29c3
--- /dev/null
+++ b/jstests/sharding/documents_sharded.js
@@ -0,0 +1,160 @@
+/**
+ * This is the test for $documents stage in aggregation pipeline on a sharded collection.
+ * @tags: [ do_not_wrap_aggregations_in_facets, requires_fcv_51 ]
+ *
+ */
+
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js"); // For resultsEq.
+
+let st = new ShardingTest({shards: 2});
+const db = st.s.getDB(jsTestName());
+const dbName = db.getName();
+assert.commandWorked(db.adminCommand({enableSharding: dbName}));
+
+// Create sharded collections.
+const coll = db['shardedColl'];
+st.shardColl(coll, {x: 1}, {x: 1}, {x: 1}, dbName);
+
+const lookup_coll = db['lookupColl'];
+st.shardColl(lookup_coll, {id_name: 1}, {id_name: 1}, {id_name: 1}, dbName);
+for (let i = 0; i < 10; i++) {
+ assert.commandWorked(lookup_coll.insert({id_name: i, name: "name_" + i}));
+}
+
+// $documents given an array of objects.
+const docs = db.aggregate([{$documents: [{a1: 1}, {a1: 2}]}]).toArray();
+
+assert.eq(2, docs.length);
+assert.eq(docs[0], {a1: 1});
+assert.eq(docs[1], {a1: 2});
+
+// $documents evaluates to an array of objects.
+const docs1 =
+ db.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]).toArray();
+
+assert.eq(100, docs1.length);
+for (let i = 0; i < 100; i++) {
+ assert.eq(docs1[i], {x: i});
+}
+
+// $documents evaluates to an array of objects.
+const docsUnionWith =
+ coll.aggregate([
+ {
+ $unionWith: {
+ pipeline: [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}]
+ }
+ },
+ {$group: {_id: "$x", x: {$first: "$x"}}},
+ {$project: {_id: 0}},
+ ])
+ .toArray();
+assert(resultsEq([{x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith));
+
+{ // $documents with const objects inside $unionWith.
+ const res = coll.aggregate([
+ {$unionWith: {pipeline: [{$documents: [{x: 1}, {x: 2}]}]}},
+ {$group: {_id: "$x", x: {$first: "$x"}}},
+ {$project: {_id: 0}}
+ ])
+ .toArray();
+ assert(resultsEq([{x: 1}, {x: 2}], res));
+}
+
+{ // $documents with const objects inside $lookup (no "coll", explicit $match).
+ const res = lookup_coll.aggregate([
+ {
+ $lookup: {
+ let: {"id_lookup": "$id_name"},
+ pipeline: [
+ {$documents: [{xx: 1}, {xx: 2}, {xx : 3}]},
+ {
+ $match:
+ {
+ $expr:
+ {
+ $eq:
+ ["$$id_lookup", "$xx"]
+ }
+ }
+ }
+ ],
+ as: "names"
+ }
+ },
+ {$match: {"names": {"$ne": []}}},
+ {$project: {_id: 0}}
+ ]
+ )
+ .toArray();
+ assert(resultsEq(
+ [
+ {id_name: 1, name: "name_1", names: [{"xx": 1}]},
+ {id_name: 2, name: "name_2", names: [{"xx": 2}]},
+ {id_name: 3, name: "name_3", names: [{"xx": 3}]}
+ ],
+ res));
+}
+{ // $documents with const objects inside $lookup (no "coll", + localField/foreignField).
+ const res = lookup_coll.aggregate([
+ {
+ $lookup: {
+ localField: "id_name",
+ foreignField: "xx",
+ pipeline: [
+ {$documents: [{xx: 1}, {xx: 2}, {xx: 3}]}
+ ],
+ as: "names"
+ }
+ },
+ {$match: {"names": {"$ne": []}}},
+ {$project: {_id: 0}}
+ ])
+ .toArray();
+ assert(resultsEq(
+ [
+ {id_name: 1, name: "name_1", names: [{"xx": 1}]},
+ {id_name: 2, name: "name_2", names: [{"xx": 2}]},
+ {id_name: 3, name: "name_3", names: [{"xx": 3}]}
+ ],
+ res));
+}
+
+// Must fail when $document appears in the top level collection pipeline.
+assert.throwsWithCode(() => {
+ coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]);
+}, ErrorCodes.InvalidNamespace);
+
+// Must fail due to misplaced $document.
+assert.throwsWithCode(() => {
+ coll.aggregate([{$project: {x: [{xx: 1}, {xx: 2}]}}, {$documents: [{x: 1}]}]);
+}, 40602);
+
+// Test that $documents fails due to producing array of non-objects.
+assert.throwsWithCode(() => {
+ db.aggregate([{$documents: [1, 2, 3]}]);
+}, 40228);
+
+// Now with one object and one scalar.
+assert.throwsWithCode(() => {
+ db.aggregate([{$documents: [{x: 1}, 2]}]);
+}, 40228);
+
+// Test that $documents fails due when provided a non-array.
+assert.throwsWithCode(() => {
+ db.aggregate([{$documents: "string"}]);
+}, 5858203);
+
+// Test that $documents succeeds when given a singleton object.
+assert.eq(db.aggregate([{$documents: [{x: [1, 2, 3]}]}]).toArray(), [{x: [1, 2, 3]}]);
+
+// Must fail when $document appears in the top level collection pipeline.
+assert.throwsWithCode(() => {
+ coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]);
+}, ErrorCodes.InvalidNamespace);
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
index 069a7e2f0b2..8f04441de80 100644
--- a/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
+++ b/src/mongo/db/pipeline/dispatch_shard_pipeline_test.cpp
@@ -53,10 +53,11 @@ TEST_F(DispatchShardPipelineTest, DoesNotSplitPipelineIfTargetingOneShard) {
const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc(
AggregateCommandRequest(expCtx()->ns, stages));
const bool hasChangeStream = false;
+ const bool startsWithDocuments = false;
auto future = launchAsync([&] {
auto results = sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, std::move(pipeline));
+ serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline));
ASSERT_EQ(results.remoteCursors.size(), 1UL);
ASSERT(!results.splitPipeline);
});
@@ -84,10 +85,11 @@ TEST_F(DispatchShardPipelineTest, DoesSplitPipelineIfMatchSpansTwoShards) {
const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc(
AggregateCommandRequest(expCtx()->ns, stages));
const bool hasChangeStream = false;
+ const bool startsWithDocuments = false;
auto future = launchAsync([&] {
auto results = sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, std::move(pipeline));
+ serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline));
ASSERT_EQ(results.remoteCursors.size(), 2UL);
ASSERT(bool(results.splitPipeline));
});
@@ -118,10 +120,11 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineRetriesOnNetworkError) {
const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc(
AggregateCommandRequest(expCtx()->ns, stages));
const bool hasChangeStream = false;
+ const bool startsWithDocuments = false;
auto future = launchAsync([&] {
// Shouldn't throw.
auto results = sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, std::move(pipeline));
+ serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline));
ASSERT_EQ(results.remoteCursors.size(), 2UL);
ASSERT(bool(results.splitPipeline));
});
@@ -163,11 +166,14 @@ TEST_F(DispatchShardPipelineTest, DispatchShardPipelineDoesNotRetryOnStaleConfig
const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc(
AggregateCommandRequest(expCtx()->ns, stages));
const bool hasChangeStream = false;
+ const bool startsWithDocuments = false;
+
auto future = launchAsync([&] {
- ASSERT_THROWS_CODE(sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, std::move(pipeline)),
- AssertionException,
- ErrorCodes::StaleConfig);
+ ASSERT_THROWS_CODE(
+ sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, startsWithDocuments, std::move(pipeline)),
+ AssertionException,
+ ErrorCodes::StaleConfig);
});
// Mock out an error response.
@@ -197,15 +203,17 @@ TEST_F(DispatchShardPipelineTest, WrappedDispatchDoesRetryOnStaleConfigError) {
const Document serializedCommand = aggregation_request_helper::serializeToCommandDoc(
AggregateCommandRequest(expCtx()->ns, stages));
const bool hasChangeStream = false;
+ const bool startsWithDocuments = false;
auto future = launchAsync([&] {
// Shouldn't throw.
sharding::router::CollectionRouter router(getServiceContext(), kTestAggregateNss);
- auto results = router.route(operationContext(),
- "dispatch shard pipeline"_sd,
- [&](OperationContext* opCtx, const ChunkManager& cm) {
- return sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, pipeline->clone());
- });
+ auto results = router.route(
+ operationContext(),
+ "dispatch shard pipeline"_sd,
+ [&](OperationContext* opCtx, const ChunkManager& cm) {
+ return sharded_agg_helpers::dispatchShardPipeline(
+ serializedCommand, hasChangeStream, startsWithDocuments, pipeline->clone());
+ });
ASSERT_EQ(results.remoteCursors.size(), 1UL);
ASSERT(!bool(results.splitPipeline));
});
diff --git a/src/mongo/db/pipeline/document_source_documents.cpp b/src/mongo/db/pipeline/document_source_documents.cpp
index 5d4e02c71ea..934116a7c27 100644
--- a/src/mongo/db/pipeline/document_source_documents.cpp
+++ b/src/mongo/db/pipeline/document_source_documents.cpp
@@ -44,7 +44,7 @@ namespace mongo {
using boost::intrusive_ptr;
REGISTER_DOCUMENT_SOURCE(documents,
- LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceDocuments::LiteParsed::parse,
DocumentSourceDocuments::createFromBson,
AllowedWithApiStrict::kAlways);
diff --git a/src/mongo/db/pipeline/document_source_documents.h b/src/mongo/db/pipeline/document_source_documents.h
index a6ab70e0c59..6048ed551bb 100644
--- a/src/mongo/db/pipeline/document_source_documents.h
+++ b/src/mongo/db/pipeline/document_source_documents.h
@@ -35,6 +35,30 @@
namespace mongo {
namespace DocumentSourceDocuments {
+class LiteParsed : public LiteParsedDocumentSource {
+public:
+ static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss, const BSONElement& spec) {
+ return std::make_unique<LiteParsed>(spec.fieldName());
+ }
+
+ LiteParsed(std::string parseTimeName) : LiteParsedDocumentSource(std::move(parseTimeName)) {}
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ return stdx::unordered_set<NamespaceString>();
+ }
+
+ PrivilegeVector requiredPrivileges(bool isMongos, bool bypassDocumentValidation) const final {
+ return {};
+ }
+
+ bool isDocuments() const final {
+ return true;
+ }
+
+ bool allowedToPassthroughFromMongos() const final {
+ return false;
+ }
+};
static constexpr StringData kStageName = "$documents"_sd;
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index 34648576917..3e1dc6ea6c6 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -171,6 +171,13 @@ public:
}
/**
+ * Returns true if this is a $documents stage.
+ */
+ virtual bool isDocuments() const {
+ return false;
+ }
+
+ /**
* Returns true if this stage does not require an input source.
*/
virtual bool isInitialSource() const {
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 712d23c32bd..d02ff8ee70e 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -123,6 +123,13 @@ public:
}
/**
+ * Returns true if the pipeline begins with a $documents stage.
+ */
+ bool startsWithDocuments() const {
+ return !_stageSpecs.empty() && _stageSpecs.front()->isDocuments();
+ }
+
+ /**
* Returns true if the pipeline has a $changeStream stage.
*/
bool hasChangeStream() const {
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index fed03ec5079..7940fc0b97b 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -769,9 +769,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
LiteParsedPipeline liteParsedPipeline(aggRequest);
auto hasChangeStream = liteParsedPipeline.hasChangeStream();
+ auto startsWithDocuments = liteParsedPipeline.startsWithDocuments();
auto shardDispatchResults =
dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
hasChangeStream,
+ startsWithDocuments,
std::move(pipeline),
shardTargetingPolicy,
std::move(readConcern));
@@ -1007,6 +1009,7 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
DispatchShardPipelineResults dispatchShardPipeline(
Document serializedCommand,
bool hasChangeStream,
+ bool startsWithDocuments,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
ShardTargetingPolicy shardTargetingPolicy,
boost::optional<BSONObj> readConcern) {
@@ -1051,7 +1054,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
: expCtx->getCollatorBSON();
// Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream);
+ const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream, startsWithDocuments);
std::set<ShardId> shardIds = getTargetedShards(
expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, shardTargetingCollation);
@@ -1486,9 +1489,11 @@ BSONObj targetShardsForExplain(Pipeline* ownedPipeline) {
AggregateCommandRequest aggRequest(expCtx->ns, rawStages);
LiteParsedPipeline liteParsedPipeline(aggRequest);
auto hasChangeStream = liteParsedPipeline.hasChangeStream();
+ auto startsWithDocuments = liteParsedPipeline.startsWithDocuments();
auto shardDispatchResults =
dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
hasChangeStream,
+ startsWithDocuments,
std::move(pipeline));
BSONObjBuilder explainBuilder;
auto appendStatus =
@@ -1523,11 +1528,13 @@ Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) {
return Shard::RetryPolicy::kIdempotent;
}
-bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) {
+bool mustRunOnAllShards(const NamespaceString& nss,
+ bool hasChangeStream,
+ bool startsWithDocuments) {
// The following aggregations must be routed to all shards:
// - Any collectionless aggregation, such as non-localOps $currentOp.
// - Any aggregation which begins with a $changeStream stage.
- return nss.isCollectionlessAggregateNS() || hasChangeStream;
+ return !startsWithDocuments && (nss.isCollectionlessAggregateNS() || hasChangeStream);
}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 5f30f6f6a93..f69d422da0c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -124,6 +124,7 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
DispatchShardPipelineResults dispatchShardPipeline(
Document serializedCommand,
bool hasChangeStream,
+ bool startsWithDocuments,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
boost::optional<BSONObj> readConcern = boost::none);
@@ -179,7 +180,7 @@ StatusWith<ChunkManager> getExecutionNsRoutingInfo(OperationContext* opCtx,
/**
* Returns true if an aggregation over 'nss' must run on all shards.
*/
-bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream);
+bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream, bool startsWithDocuments);
/**
* Retrieves the desired retry policy based on whether the default writeConcern is set on 'opCtx'.
diff --git a/src/mongo/s/commands/cluster_map_reduce_agg.cpp b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
index 52a61ab9325..3591aa20b17 100644
--- a/src/mongo/s/commands/cluster_map_reduce_agg.cpp
+++ b/src/mongo/s/commands/cluster_map_reduce_agg.cpp
@@ -187,6 +187,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
cm,
involvedNamespaces,
false, // hasChangeStream
+ false, // startsWithDocuments
true, // allowedToPassthrough
false); // perShardCursor
try {
@@ -230,7 +231,8 @@ bool runAggregationMapReduce(OperationContext* opCtx,
namespaces,
privileges,
&tempResults,
- false)); // hasChangeStream
+ false, // hasChangeStream
+ false)); // startsWithDocuments
break;
}
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 7c09e31d1ef..6a0b135fef4 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -310,6 +310,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto hasChangeStream = liteParsedPipeline.hasChangeStream();
auto involvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
auto shouldDoFLERewrite = ::mongo::shouldDoFLERewrite(request);
+ auto startsWithDocuments = liteParsedPipeline.startsWithDocuments();
// If the routing table is not already taken by the higher level, fill it now.
if (!cm) {
@@ -339,7 +340,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
if (executionNsRoutingInfoStatus.isOK()) {
cm = std::move(executionNsRoutingInfoStatus.getValue());
- } else if (!(hasChangeStream &&
+ } else if (!((hasChangeStream || startsWithDocuments) &&
executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
appendEmptyResultSetWithStatus(
opCtx, namespaces.requestedNss, executionNsRoutingInfoStatus.getStatus(), result);
@@ -403,6 +404,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
cm,
involvedNamespaces,
hasChangeStream,
+ startsWithDocuments,
allowedToPassthrough,
request.getPassthroughToShard().has_value());
@@ -477,7 +479,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
namespaces,
privileges,
result,
- hasChangeStream);
+ hasChangeStream,
+ startsWithDocuments);
}
case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
kSpecificShardOnly: {
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 3f970f2f8c1..5788b30a7d4 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -563,6 +563,7 @@ AggregationTargeter AggregationTargeter::make(
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
+ bool startsWithDocuments,
bool allowedToPassthrough,
bool perShardCursor) {
if (perShardCursor) {
@@ -583,10 +584,11 @@ AggregationTargeter AggregationTargeter::make(
// Determine whether this aggregation must be dispatched to all shards in the cluster.
const bool mustRunOnAll =
- sharded_agg_helpers::mustRunOnAllShards(executionNss, hasChangeStream);
+ sharded_agg_helpers::mustRunOnAllShards(executionNss, hasChangeStream, startsWithDocuments);
- // If we don't have a routing table, then this is a $changeStream which must run on all shards.
- invariant(cm || (mustRunOnAll && hasChangeStream));
+ // If we don't have a routing table, then this is either a $changeStream which must run on all
+ // shards or a $documents stage which must not.
+ invariant(cm || (mustRunOnAll && hasChangeStream) || (startsWithDocuments && !mustRunOnAll));
// A pipeline is allowed to passthrough to the primary shard iff the following conditions are
// met:
@@ -663,11 +665,12 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
const ClusterAggregate::Namespaces& namespaces,
const PrivilegeVector& privileges,
BSONObjBuilder* result,
- bool hasChangeStream) {
+ bool hasChangeStream,
+ bool startsWithDocuments) {
auto expCtx = targeter.pipeline->getContext();
// If not, split the pipeline as necessary and dispatch to the relevant shards.
auto shardDispatchResults = sharded_agg_helpers::dispatchShardPipeline(
- serializedCommand, hasChangeStream, std::move(targeter.pipeline));
+ serializedCommand, hasChangeStream, startsWithDocuments, std::move(targeter.pipeline));
// If the operation is an explain, then we verify that it succeeded on all targeted
// shards, write the results to the output builder, and return immediately.
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 046d8eab6ca..78c919192db 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -82,6 +82,7 @@ struct AggregationTargeter {
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
+ bool startsWithDocuments,
bool allowedToPassthrough,
bool perShardCursor);
@@ -125,7 +126,8 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
const ClusterAggregate::Namespaces& namespaces,
const PrivilegeVector& privileges,
BSONObjBuilder* result,
- bool hasChangeStream);
+ bool hasChangeStream,
+ bool startsWithDocuments);
/**
* Similar to runPipelineOnPrimaryShard but allows $changeStreams. Intended for use by per shard