summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2018-11-27 13:28:27 -0500
committerJames Wahlin <james@mongodb.com>2018-12-12 14:41:24 -0500
commit056d61676f91f6da0a030347ae4b92255d752d8f (patch)
tree92f5b2d319ce1cd5701be912e6b96cf9a6fdaa4b
parentd2573d47786b035d5bcdeaf30207bbfcd58bf14e (diff)
downloadmongo-056d61676f91f6da0a030347ae4b92255d752d8f.tar.gz
SERVER-32308 Support for $lookup to execute on mongos against a sharded foreign collection
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml4
-rw-r--r--jstests/aggregation/mongos_merge.js39
-rw-r--r--jstests/aggregation/sources/facet/use_cases.js21
-rw-r--r--jstests/aggregation/sources/graphLookup/sharded.js56
-rw-r--r--jstests/aggregation/sources/lookup/collation_lookup.js368
-rw-r--r--jstests/aggregation/sources/lookup/lookup.js1148
-rw-r--r--jstests/aggregation/sources/lookup/lookup_subpipeline.js604
-rw-r--r--jstests/sharding/collation_lookup.js454
-rw-r--r--jstests/sharding/lookup.js618
-rw-r--r--jstests/sharding/lookup_mongod_unaware.js168
-rw-r--r--jstests/sharding/lookup_stale_mongos.js130
-rw-r--r--src/mongo/db/pipeline/SConscript3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp20
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h7
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp17
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h16
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp468
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h91
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp36
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h6
-rw-r--r--src/mongo/s/query/SConscript12
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp525
30 files changed, 2725 insertions, 2165 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml
index 81bfe24a992..089e566dcdc 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml
@@ -12,6 +12,8 @@ selector:
- jstests/aggregation/bugs/lookup_unwind_killcursor.js
# TODO: Remove when SERVER-23229 is fixed.
- jstests/aggregation/bugs/groupMissing.js
+ # TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections.
+ - jstests/aggregation/sources/lookup/lookup_subpipeline.js
exclude_with_any_tags:
# Tests tagged with the following will fail because they assume collections are not sharded.
- assumes_no_implicit_collection_creation_after_drop
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
index d01c66bac17..5f1410e240e 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
@@ -23,6 +23,8 @@ selector:
- jstests/aggregation/use_query_sort.js
# TODO: Remove when SERVER-23229 is fixed.
- jstests/aggregation/bugs/groupMissing.js
+ # TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections.
+ - jstests/aggregation/sources/lookup/lookup_subpipeline.js
exclude_with_any_tags:
# Tests tagged with the following will fail because they assume collections are not sharded.
- assumes_no_implicit_collection_creation_after_drop
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index be73c186383..3fa0acabb70 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -344,12 +344,16 @@ selector:
- jstests/sharding/stale_mongos_updates_and_removes.js
- jstests/sharding/geo_near_sharded.js
# Enable when 4.2 becomes last-stable.
+ - jstests/sharding/collation_lookup.js
- jstests/sharding/collation_targeting.js
- jstests/sharding/collation_targeting_inherited.js
- jstests/sharding/failcommand_failpoint_not_parallel.js
- jstests/sharding/failcommand_ignores_internal.js
- jstests/sharding/geo_near_random1.js
- jstests/sharding/geo_near_random2.js
+ - jstests/sharding/lookup.js
+ - jstests/sharding/lookup_mongod_unaware.js
+ - jstests/sharding/lookup_stale_mongos.js
- jstests/sharding/out_cannot_run_on_mongos.js
- jstests/sharding/out_command_options.js
- jstests/sharding/out_from_stale_mongos.js
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js
index 13ab1b2431e..df22312ce13 100644
--- a/jstests/aggregation/mongos_merge.js
+++ b/jstests/aggregation/mongos_merge.js
@@ -286,6 +286,45 @@
allowDiskUse: allowDiskUse,
expectedCount: 400
});
+
+ // Test that $lookup is merged on the primary shard when the foreign collection is
+ // unsharded.
+ assertMergeOnMongoD({
+ testName: "agg_mongos_merge_lookup_unsharded_disk_use_" + allowDiskUse,
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $lookup: {
+ from: unshardedColl.getName(),
+ localField: "_id",
+ foreignField: "_id",
+ as: "lookupField"
+ }
+ }
+ ],
+ mergeType: "primaryShard",
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
+
+ // Test that $lookup is merged on mongoS when the foreign collection is sharded.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_lookup_sharded_disk_use_" + allowDiskUse,
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $lookup: {
+ from: mongosColl.getName(),
+ localField: "_id",
+ foreignField: "_id",
+ as: "lookupField"
+ }
+ }
+ ],
+ mergeType: "mongos",
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
}
/**
diff --git a/jstests/aggregation/sources/facet/use_cases.js b/jstests/aggregation/sources/facet/use_cases.js
index 1295f638910..6074e66d810 100644
--- a/jstests/aggregation/sources/facet/use_cases.js
+++ b/jstests/aggregation/sources/facet/use_cases.js
@@ -115,8 +115,6 @@
populateData(st.s0, nDocs);
doExecutionTest(st.s0);
- // Test that $facet stage propagates information about involved collections, preventing users
- // from doing things like $lookup from a sharded collection.
const shardedDBName = "sharded";
const shardedCollName = "collection";
const shardedColl = st.getDB(shardedDBName).getCollection(shardedCollName);
@@ -126,21 +124,8 @@
assert.commandWorked(
st.admin.runCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}}));
- // Test that trying to perform a $lookup on a sharded collection returns an error.
- let res = assert.commandFailed(unshardedColl.runCommand({
- aggregate: unshardedColl.getName(),
- pipeline: [{
- $lookup:
- {from: shardedCollName, localField: "_id", foreignField: "_id", as: "results"}
- }],
- cursor: {}
- }));
- assert.eq(
- 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection");
-
- // Test that trying to perform a $lookup on a sharded collection inside a $facet stage still
- // returns an error.
- res = assert.commandFailed(unshardedColl.runCommand({
+ // Test $lookup inside a $facet stage on a sharded collection.
+ assert.commandWorked(unshardedColl.runCommand({
aggregate: unshardedColl.getName(),
pipeline: [{
$facet: {
@@ -156,8 +141,6 @@
}],
cursor: {}
}));
- assert.eq(
- 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection");
// Then run the assertions against a sharded collection.
assert.commandWorked(st.admin.runCommand({enableSharding: dbName}));
diff --git a/jstests/aggregation/sources/graphLookup/sharded.js b/jstests/aggregation/sources/graphLookup/sharded.js
deleted file mode 100644
index b78649d5824..00000000000
--- a/jstests/aggregation/sources/graphLookup/sharded.js
+++ /dev/null
@@ -1,56 +0,0 @@
-// In SERVER-23725, $graphLookup was introduced. In this file, we test that the expression behaves
-// correctly on a sharded collection.
-// @tags: [requires_sharding]
-load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
-
-(function() {
- "use strict";
-
- var st = new ShardingTest({name: "aggregation_graph_lookup", shards: 2, mongos: 1});
-
- st.adminCommand({enableSharding: "graphLookup"});
- st.ensurePrimaryShard("graphLookup", st.shard1.shardName);
- st.adminCommand({shardCollection: "graphLookup.local", key: {_id: 1}});
-
- var foreign = st.getDB("graphLookup").foreign;
- var local = st.getDB("graphLookup").local;
-
- var bulk = foreign.initializeUnorderedBulkOp();
-
- for (var i = 0; i < 100; i++) {
- bulk.insert({_id: i, next: i + 1});
- }
- assert.writeOK(bulk.execute());
-
- assert.writeOK(local.insert({}));
-
- var res = st.s.getDB("graphLookup")
- .local
- .aggregate({
- $graphLookup: {
- from: "foreign",
- startWith: {$literal: 0},
- connectToField: "_id",
- connectFromField: "next",
- as: "number_line"
- }
- })
- .toArray();
-
- assert.eq(res.length, 1);
- assert.eq(res[0].number_line.length, 100);
-
- // Cannot perform a $graphLookup where the "from" collection is sharded.
- var pipeline = {
- $graphLookup: {
- from: "local",
- startWith: {$literal: 0},
- connectToField: "_id",
- connectFromField: "_id",
- as: "out"
- }
- };
-
- assertErrorCode(foreign, pipeline, 28769);
- st.stop();
-}());
diff --git a/jstests/aggregation/sources/lookup/collation_lookup.js b/jstests/aggregation/sources/lookup/collation_lookup.js
deleted file mode 100644
index 80d173138a6..00000000000
--- a/jstests/aggregation/sources/lookup/collation_lookup.js
+++ /dev/null
@@ -1,368 +0,0 @@
-// Cannot implicitly shard accessed collections because of collection existing when none expected.
-// @tags: [assumes_no_implicit_collection_creation_after_drop]
-
-/**
- * Tests that the $lookup stage respects the collation.
- *
- * The comparison of string values between the 'localField' and 'foreignField' should use the
- * collation either explicitly set on the aggregation operation, or the collation inherited from the
- * collection the "aggregate" command was performed on.
- */
-(function() {
- "use strict";
-
- load("jstests/aggregation/extras/utils.js"); // for arrayEq
-
- const caseInsensitive = {collation: {locale: "en_US", strength: 2}};
-
- const withDefaultCollationColl = db.collation_lookup_with_default;
- const withoutDefaultCollationColl = db.collation_lookup_without_default;
-
- withDefaultCollationColl.drop();
- withoutDefaultCollationColl.drop();
-
- assert.commandWorked(db.createCollection(withDefaultCollationColl.getName(), caseInsensitive));
- assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
-
- assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
- assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"}));
- assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"}));
-
- // Test that the $lookup stage respects the inherited collation.
- let res = withDefaultCollationColl
- .aggregate([{
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- }])
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
- assert(
- arrayEq(expected, res[0].matched),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering");
-
- res = withDefaultCollationColl
- .aggregate([{
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- }
- ],
- as: "matched1",
- },
- }])
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- },
- {
- "_id": "uppercase",
- "str": "ABC",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- }
- ];
- assert(arrayEq(expected, res[0].matched1),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
- " up to ordering. " + tojson(res));
-
- // Test that the $lookup stage respects the inherited collation when it optimizes with an
- // $unwind stage.
- res = withDefaultCollationColl
- .aggregate([
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- {$unwind: "$matched"},
- ])
- .toArray();
- assert.eq(2, res.length, tojson(res));
-
- expected = [
- {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
- {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- res = withDefaultCollationColl
- .aggregate([
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- },
- {$unwind: "$matched2"},
- ],
- as: "matched1",
- },
- },
- {$unwind: "$matched1"},
- ])
- .toArray();
- assert.eq(4, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}}
- }
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- // Test that the $lookup stage respects an explicit collation on the aggregation operation.
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- ],
- caseInsensitive)
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
- assert(
- arrayEq(expected, res[0].matched),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering");
-
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- }
- ],
- as: "matched1",
- },
- }
- ],
- caseInsensitive)
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- },
- {
- "_id": "uppercase",
- "str": "ABC",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- }
- ];
- assert(arrayEq(expected, res[0].matched1),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
- " up to ordering");
-
- // Test that the $lookup stage respects an explicit collation on the aggregation operation when
- // it optimizes with an $unwind stage.
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- {$unwind: "$matched"},
- ],
- caseInsensitive)
- .toArray();
- assert.eq(2, res.length, tojson(res));
-
- expected = [
- {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
- {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- },
- {$unwind: "$matched2"},
- ],
- as: "matched1",
- },
- },
- {$unwind: "$matched1"},
- ],
- caseInsensitive)
- .toArray();
- assert.eq(4, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}}
- }
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the
- // collection or the aggregation operation.
- res = withoutDefaultCollationColl
- .aggregate([
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- ])
- .toArray();
- assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res);
-
- res = withoutDefaultCollationColl
- .aggregate([
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- },
- {$unwind: "$matched2"},
- ],
- as: "matched1",
- },
- },
- ])
- .toArray();
- assert.eq([{
- "_id": "lowercase",
- "str": "abc",
- "matched1": [{
- "_id": "lowercase",
- "str": "abc",
- "matched2": {"_id": "lowercase", "str": "abc"}
- }]
- }],
- res);
-})();
diff --git a/jstests/aggregation/sources/lookup/lookup.js b/jstests/aggregation/sources/lookup/lookup.js
deleted file mode 100644
index 0d29f4eb8a8..00000000000
--- a/jstests/aggregation/sources/lookup/lookup.js
+++ /dev/null
@@ -1,1148 +0,0 @@
-// Basic $lookup regression tests.
-// @tags: [requires_sharding]
-
-load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
-
-(function() {
- "use strict";
-
- // Used by testPipeline to sort result documents. All _ids must be primitives.
- function compareId(a, b) {
- if (a._id < b._id) {
- return -1;
- }
- if (a._id > b._id) {
- return 1;
- }
- return 0;
- }
-
- function generateNestedPipeline(foreignCollName, numLevels) {
- let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}];
-
- for (let level = 1; level < numLevels; level++) {
- pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}];
- }
-
- return pipeline;
- }
-
- // Helper for testing that pipeline returns correct set of results.
- function testPipeline(pipeline, expectedResult, collection) {
- assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
- expectedResult.sort(compareId));
- }
-
- function runTest(coll, from, thirdColl, fourthColl) {
- var db = null; // Using the db variable is banned in this function.
-
- assert.writeOK(coll.insert({_id: 0, a: 1}));
- assert.writeOK(coll.insert({_id: 1, a: null}));
- assert.writeOK(coll.insert({_id: 2}));
-
- assert.writeOK(from.insert({_id: 0, b: 1}));
- assert.writeOK(from.insert({_id: 1, b: null}));
- assert.writeOK(from.insert({_id: 2}));
-
- //
- // Basic functionality.
- //
-
- // "from" document added to "as" field if a == b, where nonexistent fields are treated as
- // null.
- var expectedResults = [
- {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}],
- expectedResults,
- coll);
-
- // If localField is nonexistent, it is treated as if it is null.
- expectedResults = [
- {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}],
- expectedResults,
- coll);
-
- // If foreignField is nonexistent, it is treated as if it is null.
- expectedResults = [
- {_id: 0, a: 1, "same": []},
- {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}],
- expectedResults,
- coll);
-
- // If there are no matches or the from coll doesn't exist, the result is an empty array.
- expectedResults =
- [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}];
- testPipeline(
- [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}],
- expectedResults,
- coll);
- testPipeline(
- [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}],
- expectedResults,
- coll);
-
- // If field name specified by "as" already exists, it is overwritten.
- expectedResults = [
- {_id: 0, "a": [{_id: 0, b: 1}]},
- {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}],
- expectedResults,
- coll);
-
- // Running multiple $lookups in the same pipeline is allowed.
- expectedResults = [
- {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]},
- {
- _id: 1,
- a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]
- },
- {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}},
- {$project: {"a": 1, "c": 1}},
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}}
- ],
- expectedResults,
- coll);
-
- //
- // Coalescing with $unwind.
- //
-
- // A normal $unwind with on the "as" field.
- expectedResults = [
- {_id: 0, a: 1, same: {_id: 0, b: 1}},
- {_id: 1, a: null, same: {_id: 1, b: null}},
- {_id: 1, a: null, same: {_id: 2}},
- {_id: 2, same: {_id: 1, b: null}},
- {_id: 2, same: {_id: 2}}
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
- {$unwind: {path: "$same"}}
- ],
- expectedResults,
- coll);
-
- // An $unwind on the "as" field, with includeArrayIndex.
- expectedResults = [
- {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)},
- {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)},
- {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)},
- {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)},
- {_id: 2, same: {_id: 2}, index: NumberLong(1)},
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
- {$unwind: {path: "$same", includeArrayIndex: "index"}}
- ],
- expectedResults,
- coll);
-
- // Normal $unwind with no matching documents.
- expectedResults = [];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
- {$unwind: {path: "$same"}}
- ],
- expectedResults,
- coll);
-
- // $unwind with preserveNullAndEmptyArray with no matching documents.
- expectedResults = [
- {_id: 0, a: 1},
- {_id: 1, a: null},
- {_id: 2},
- ];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
- {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
- ],
- expectedResults,
- coll);
-
- // $unwind with preserveNullAndEmptyArray, some with matching documents, some without.
- expectedResults = [
- {_id: 0, a: 1},
- {_id: 1, a: null, same: {_id: 0, b: 1}},
- {_id: 2},
- ];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
- {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
- ],
- expectedResults,
- coll);
-
- // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching
- // documents, some without.
- expectedResults = [
- {_id: 0, a: 1, index: null},
- {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)},
- {_id: 2, index: null},
- ];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
- {
- $unwind:
- {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"}
- }
- ],
- expectedResults,
- coll);
-
- //
- // Dependencies.
- //
-
- // If $lookup didn't add "localField" to its dependencies, this test would fail as the
- // value of the "a" field would be lost and treated as null.
- expectedResults = [
- {_id: 0, "same": [{_id: 0, b: 1}]},
- {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
- {$project: {"same": 1}}
- ],
- expectedResults,
- coll);
-
- // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test
- // would fail as the value of the "a" field would be lost and treated as null.
- expectedResults = [
- {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]},
- {
- "_id": 1,
- "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}]
- },
- {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]}
- ];
- testPipeline(
- [
- {
- $lookup: {
- let : {var1: "$a"},
- pipeline: [{$project: {x: "$$var1"}}],
- from: "from",
- as: "same"
- }
- },
- {$project: {"same": 1}}
- ],
- expectedResults,
- coll);
-
- //
- // Dotted field paths.
- //
-
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: 1}));
- assert.writeOK(coll.insert({_id: 1, a: null}));
- assert.writeOK(coll.insert({_id: 2}));
- assert.writeOK(coll.insert({_id: 3, a: {c: 1}}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, b: 1}));
- assert.writeOK(from.insert({_id: 1, b: null}));
- assert.writeOK(from.insert({_id: 2}));
- assert.writeOK(from.insert({_id: 3, b: {c: 1}}));
- assert.writeOK(from.insert({_id: 4, b: {c: 2}}));
-
- // Once without a dotted field.
- var pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}];
- expectedResults = [
- {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Look up a dotted field.
- pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}];
- // All but the last document in 'coll' have a nullish value for 'a.c'.
- expectedResults = [
- {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // With an $unwind stage.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: {b: 1}}));
- assert.writeOK(coll.insert({_id: 1}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, target: 1}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a.b",
- foreignField: "target",
- from: "from",
- as: "same.documents",
- }
- },
- {
- // Expected input to $unwind:
- // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}}
- // {_id: 1, same: {documents: []}}
- $unwind: {
- path: "$same.documents",
- preserveNullAndEmptyArrays: true,
- includeArrayIndex: "c.d.e",
- }
- }
- ];
- expectedResults = [
- {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}},
- {_id: 1, same: {}, c: {d: {e: null}}},
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Query-like local fields (SERVER-21287)
- //
-
- // This must only do an equality match rather than treating the value as a regex.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: /a regex/}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, b: /a regex/}));
- assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "b",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // A local value of an array.
- //
-
- // Basic array corresponding to multiple documents.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0}));
- assert.writeOK(from.insert({_id: 1}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "_id",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- // Basic array corresponding to a single document.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [1]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0}));
- assert.writeOK(from.insert({_id: 1}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "_id",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- // Array containing regular expressions.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]}));
- assert.writeOK(coll.insert({_id: 1, a: [/^x/]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, b: "should not match a regex"}));
- assert.writeOK(from.insert({_id: 1, b: "xxxx"}));
- assert.writeOK(from.insert({_id: 2, b: /a regex/}));
- assert.writeOK(from.insert({_id: 3, b: /^x/}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "b",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [
- {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]},
- {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'localField' references a field within an array of sub-objects.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0}));
- assert.writeOK(from.insert({_id: 1}));
- assert.writeOK(from.insert({_id: 2}));
- assert.writeOK(from.insert({_id: 3}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a.b",
- foreignField: "_id",
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Pipeline syntax using 'let' variables.
- //
- coll.drop();
- assert.writeOK(coll.insert({_id: 1, x: 1}));
- assert.writeOK(coll.insert({_id: 2, x: 2}));
- assert.writeOK(coll.insert({_id: 3, x: 3}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 1}));
- assert.writeOK(from.insert({_id: 2}));
- assert.writeOK(from.insert({_id: 3}));
-
- // Basic non-equi theta join via $project.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}},
- {$match: {isMatch: true}},
- {$project: {isMatch: 0}}
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, x: 1, "c": []},
- {"_id": 2, x: 2, "c": [{"_id": 1}]},
- {
- "_id": 3,
- x: 3,
- "c": [
- {"_id": 1},
- {
- "_id": 2,
- }
- ]
- }
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Basic non-equi theta join via $match.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {$match: {$expr: {$lt: ["$_id", "$$var1"]}}},
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, x: 1, "c": []},
- {"_id": 2, x: 2, "c": [{"_id": 1}]},
- {
- "_id": 3,
- x: 3,
- "c": [
- {"_id": 1},
- {
- "_id": 2,
- }
- ]
- }
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Multi-level join using $match.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {$match: {$expr: {$eq: ["$_id", "$$var1"]}}},
- {
- $lookup: {
- let : {var2: "$_id"},
- pipeline: [
- {$match: {$expr: {$gt: ["$_id", "$$var2"]}}},
- ],
- from: "from",
- as: "d"
- }
- },
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]},
- {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]},
- {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Equijoin with $match that can't be delegated to the query subsystem.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {$addFields: {newField: 2}},
- {$match: {$expr: {$eq: ["$newField", "$$var1"]}}},
- {$project: {newField: 0}}
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, "x": 1, "c": []},
- {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
- {"_id": 3, "x": 3, "c": []}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Multiple variables.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id", var2: "$x"},
- pipeline: [
- {
- $project: {
- isMatch: {$gt: ["$$var1", "$_id"]},
- var2Times2: {$multiply: [2, "$$var2"]}
- }
- },
- {$match: {isMatch: true}},
- {$project: {isMatch: 0}}
- ],
- from: "from",
- as: "c",
- },
- },
- {$project: {x: 1, c: 1}}
- ];
-
- expectedResults = [
- {"_id": 1, x: 1, "c": []},
- {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]},
- {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Let var as complex expression object.
- pipeline = [
- {
- $lookup: {
- let : {var1: {$mod: ["$x", 3]}},
- pipeline: [
- {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}},
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {
- "_id": 1,
- x: 1,
- "c": [
- {_id: 1, var1Mod3TimesForeignId: 1},
- {_id: 2, var1Mod3TimesForeignId: 2},
- {_id: 3, var1Mod3TimesForeignId: 3}
- ]
- },
- {
- "_id": 2,
- x: 2,
- "c": [
- {_id: 1, var1Mod3TimesForeignId: 2},
- {_id: 2, var1Mod3TimesForeignId: 4},
- {_id: 3, var1Mod3TimesForeignId: 6}
- ]
- },
- {
- "_id": 3,
- x: 3,
- "c": [
- {_id: 1, var1Mod3TimesForeignId: 0},
- {_id: 2, var1Mod3TimesForeignId: 0},
- {_id: 3, var1Mod3TimesForeignId: 0}
- ]
- }
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' defined variables are available to all nested sub-pipelines.
- pipeline = [
- {$match: {_id: 1}},
- {
- $lookup: {
- let : {var1: "ABC", var2: "123"},
- pipeline: [
- {$match: {_id: 1}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 2}},
- {$addFields: {letVar1: "$$var1"}},
- {
- $lookup: {
- let : {var3: "XYZ"},
- pipeline: [{
- $addFields: {
- mergedLetVars:
- {$concat: ["$$var1", "$$var2", "$$var3"]}
- }
- }],
- from: "from",
- as: "join3"
- }
- },
- ],
- from: "from",
- as: "join2"
- }
- },
- ],
- from: "from",
- as: "join1",
- }
- }
- ];
-
- expectedResults = [{
- "_id": 1,
- "x": 1,
- "join1": [{
- "_id": 1,
- "join2": [{
- "_id": 2,
- "letVar1": "ABC",
- "join3": [
- {"_id": 1, "mergedLetVars": "ABC123XYZ"},
- {"_id": 2, "mergedLetVars": "ABC123XYZ"},
- {"_id": 3, "mergedLetVars": "ABC123XYZ"}
- ]
- }]
- }]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' variable shadowed by foreign pipeline variable.
- pipeline = [
- {$match: {_id: 2}},
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {
- $project: {
- shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}},
- originalVar: "$$var1"
- }
- },
- {
- $lookup: {
- pipeline: [{
- $project: {
- shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}},
- originalVar: "$$var1"
- }
- }],
- from: "from",
- as: "d"
- }
- }
- ],
- from: "from",
- as: "c",
- }
- }
- ];
-
- expectedResults = [{
- "_id": 2,
- "x": 2,
- "c": [
- {
- "_id": 1,
- "shadowedVar": "abc",
- "originalVar": 2,
- "d": [
- {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
- ]
- },
- {
- "_id": 2,
- "shadowedVar": "abc",
- "originalVar": 2,
- "d": [
- {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
- ]
- },
- {
- "_id": 3,
- "shadowedVar": "abc",
- "originalVar": 2,
- "d": [
- {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
- ]
- }
- ]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // Use of undefined variable fails.
- assertErrorCode(coll,
- [{
- $lookup: {
- from: "from",
- as: "as",
- let : {var1: "$x"},
- pipeline: [{$project: {myVar: "$$nonExistent"}}]
- }
- }],
- 17276);
-
- // The dotted path offset of a non-object variable is equivalent referencing an undefined
- // field.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {
- $match: {
- $expr: {
- $eq: [
- "FIELD-IS-NULL",
- {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]}
- ]
- }
- }
- },
- ],
- from: "from",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [
- {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
- {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
- {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Comparison where a 'let' variable references an array.
- coll.drop();
- assert.writeOK(coll.insert({x: [1, 2, 3]}));
-
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}},
- ],
- from: "from",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Pipeline syntax with nested object.
- //
- coll.drop();
- assert.writeOK(coll.insert({x: {y: {z: 10}}}));
-
- // Subfields of 'let' variables can be referenced via dotted path.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {$project: {z: "$$var1.y.z"}},
- ],
- from: "from",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{
- "x": {"y": {"z": 10}},
- "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' variable with dotted field path off of $$ROOT.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$$ROOT.x.y.z"},
- pipeline:
- [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}],
- from: "lookUp",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' variable with dotted field path off of $$CURRENT.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$$CURRENT.x.y.z"},
- pipeline: [
- {$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}},
- {$project: {_id: 0}}
- ],
- from: "lookUp",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Pipeline syntax with nested $lookup.
- //
- coll.drop();
- assert.writeOK(coll.insert({_id: 1, w: 1}));
- assert.writeOK(coll.insert({_id: 2, w: 2}));
- assert.writeOK(coll.insert({_id: 3, w: 3}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 1, x: 1}));
- assert.writeOK(from.insert({_id: 2, x: 2}));
- assert.writeOK(from.insert({_id: 3, x: 3}));
-
- thirdColl.drop();
- assert.writeOK(thirdColl.insert({_id: 1, y: 1}));
- assert.writeOK(thirdColl.insert({_id: 2, y: 2}));
- assert.writeOK(thirdColl.insert({_id: 3, y: 3}));
-
- fourthColl.drop();
- assert.writeOK(fourthColl.insert({_id: 1, z: 1}));
- assert.writeOK(fourthColl.insert({_id: 2, z: 2}));
- assert.writeOK(fourthColl.insert({_id: 3, z: 3}));
-
- // Nested $lookup pipeline.
- pipeline = [
- {$match: {_id: 1}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 2}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 3}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 1}},
- ],
- from: "fourthColl",
- as: "thirdLookup"
- }
- },
- ],
- from: "thirdColl",
- as: "secondLookup"
- }
- },
- ],
- from: "from",
- as: "firstLookup",
- }
- }
- ];
-
- expectedResults = [{
- "_id": 1,
- "w": 1,
- "firstLookup": [{
- "_id": 2,
- x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}]
- }]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested
- // $lookup sub-pipelines up to the maximum depth, but not beyond.
- let nestedPipeline = generateNestedPipeline("lookup", 20);
- assert.commandWorked(coll.getDB().runCommand(
- {aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}}));
-
- nestedPipeline = generateNestedPipeline("lookup", 21);
- assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded);
-
- // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose
- // combined nesting depth exceeds the limit.
- nestedPipeline = generateNestedPipeline("lookup", 10);
- coll.getDB().view1.drop();
- assert.commandWorked(
- coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline}));
-
- nestedPipeline = generateNestedPipeline("view1", 10);
- coll.getDB().view2.drop();
- assert.commandWorked(
- coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline}));
-
- // Confirm that a composite sub-pipeline depth of 20 is allowed.
- assert.commandWorked(
- coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}}));
-
- const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1);
- coll.getDB().view3.drop();
- assert.commandWorked(coll.getDB().runCommand(
- {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit}));
-
- // Confirm that a composite sub-pipeline depth greater than 20 fails.
- assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded);
-
- //
- // Error cases.
- //
-
- // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with
- // localField/foreignField syntax.
- assertErrorCode(coll,
- [{$lookup: {foreignField: "b", from: "from", as: "same"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", from: "from", as: "same"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", as: "same"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: "from"}}],
- ErrorCodes.FailedToParse);
-
- // localField/foreignField and pipeline/let syntax must not be mixed.
- assertErrorCode(coll,
- [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(
- coll,
- [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(
- coll,
- [{
- $lookup:
- {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"}
- }],
- ErrorCodes.FailedToParse);
-
- // 'from', 'as', 'localField' and 'foreignField' must all be of type string.
- assertErrorCode(coll,
- [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}],
- ErrorCodes.FailedToParse);
-
- // 'pipeline' and 'let' must be of expected type.
- assertErrorCode(
- coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
- assertErrorCode(
- coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
- assertErrorCode(coll,
- [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
-
- // The foreign collection must be a valid namespace.
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}],
- ErrorCodes.InvalidNamespace);
- // $lookup's field must be an object.
- assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse);
- }
-
- // Run tests on single node.
- db.lookUp.drop();
- db.from.drop();
- db.thirdColl.drop();
- db.fourthColl.drop();
- runTest(db.lookUp, db.from, db.thirdColl, db.fourthColl);
-
- // Run tests in a sharded environment.
- var sharded = new ShardingTest({shards: 2, mongos: 1});
- assert(sharded.adminCommand({enableSharding: "test"}));
- sharded.getDB('test').lookUp.drop();
- sharded.getDB('test').from.drop();
- sharded.getDB('test').thirdColl.drop();
- sharded.getDB('test').fourthColl.drop();
- assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}}));
- runTest(sharded.getDB('test').lookUp,
- sharded.getDB('test').from,
- sharded.getDB('test').thirdColl,
- sharded.getDB('test').fourthColl);
-
- // An error is thrown if the from collection is sharded.
- assert(sharded.adminCommand({shardCollection: "test.from", key: {_id: 1}}));
- assertErrorCode(sharded.getDB('test').lookUp,
- [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}],
- 28769);
-
- // An error is thrown if nested $lookup from collection is sharded.
- assert(sharded.adminCommand({shardCollection: "test.fourthColl", key: {_id: 1}}));
- assertErrorCode(sharded.getDB('test').lookUp,
- [{
- $lookup: {
- pipeline: [{$lookup: {pipeline: [], from: "fourthColl", as: "same"}}],
- from: "thirdColl",
- as: "same"
- }
- }],
- 28769);
-
- // Test that a $lookup from an unsharded collection followed by an $out to a sharded collection
- // is allowed.
- const sourceColl = sharded.getDB("test").lookUp;
- sourceColl.drop();
- assert(sharded.adminCommand({shardCollection: sourceColl.getFullName(), key: {_id: "hashed"}}));
- assert.commandWorked(sourceColl.insert({_id: 0, a: 0}));
-
- const outColl = sharded.getDB("test").out;
- outColl.drop();
- assert(sharded.adminCommand({shardCollection: outColl.getFullName(), key: {_id: "hashed"}}));
-
- const fromColl = sharded.getDB("test").from;
- fromColl.drop();
- assert.commandWorked(fromColl.insert({_id: 0, b: 0}));
-
- sourceColl.aggregate([
- {$lookup: {localField: "a", foreignField: "b", from: fromColl.getName(), as: "same"}},
- {$out: {to: outColl.getName(), mode: "insertDocuments"}}
- ]);
-
- assert.eq([{a: 0, same: [{_id: 0, b: 0}]}], outColl.find({}, {_id: 0}).toArray());
-
- sharded.stop();
-}());
diff --git a/jstests/aggregation/sources/lookup/lookup_subpipeline.js b/jstests/aggregation/sources/lookup/lookup_subpipeline.js
new file mode 100644
index 00000000000..abffadf4c0b
--- /dev/null
+++ b/jstests/aggregation/sources/lookup/lookup_subpipeline.js
@@ -0,0 +1,604 @@
+// Tests for the $lookup stage with a sub-pipeline.
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const testName = "lookup_subpipeline";
+
+ const coll = db.lookUp;
+ const from = db.from;
+ const thirdColl = db.thirdColl;
+ const fourthColl = db.fourthColl;
+
+ // Used by testPipeline to sort result documents. All _ids must be primitives.
+ function compareId(a, b) {
+ if (a._id < b._id) {
+ return -1;
+ }
+ if (a._id > b._id) {
+ return 1;
+ }
+ return 0;
+ }
+
+ function generateNestedPipeline(foreignCollName, numLevels) {
+ let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}];
+
+ for (let level = 1; level < numLevels; level++) {
+ pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}];
+ }
+
+ return pipeline;
+ }
+
+ // Helper for testing that pipeline returns correct set of results.
+ function testPipeline(pipeline, expectedResult, collection) {
+ assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
+ expectedResult.sort(compareId));
+ }
+
+ //
+ // Pipeline syntax using 'let' variables.
+ //
+ coll.drop();
+ assert.writeOK(coll.insert({_id: 1, x: 1}));
+ assert.writeOK(coll.insert({_id: 2, x: 2}));
+ assert.writeOK(coll.insert({_id: 3, x: 3}));
+
+ from.drop();
+ assert.writeOK(from.insert({_id: 1}));
+ assert.writeOK(from.insert({_id: 2}));
+ assert.writeOK(from.insert({_id: 3}));
+
+ // Basic non-equi theta join via $project.
+ let pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}},
+ {$match: {isMatch: true}},
+ {$project: {isMatch: 0}}
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ let expectedResults = [
+ {"_id": 1, x: 1, "c": []},
+ {"_id": 2, x: 2, "c": [{"_id": 1}]},
+ {
+ "_id": 3,
+ x: 3,
+ "c": [
+ {"_id": 1},
+ {
+ "_id": 2,
+ }
+ ]
+ }
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+ // Basic non-equi theta join via $match.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {$match: {$expr: {$lt: ["$_id", "$$var1"]}}},
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {"_id": 1, x: 1, "c": []},
+ {"_id": 2, x: 2, "c": [{"_id": 1}]},
+ {
+ "_id": 3,
+ x: 3,
+ "c": [
+ {"_id": 1},
+ {
+ "_id": 2,
+ }
+ ]
+ }
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Multi-level join using $match.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$_id", "$$var1"]}}},
+ {
+ $lookup: {
+ let : {var2: "$_id"},
+ pipeline: [
+ {$match: {$expr: {$gt: ["$_id", "$$var2"]}}},
+ ],
+ from: "from",
+ as: "d"
+ }
+ },
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]},
+ {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]},
+ {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Equijoin with $match that can't be delegated to the query subsystem.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {$addFields: {newField: 2}},
+ {$match: {$expr: {$eq: ["$newField", "$$var1"]}}},
+ {$project: {newField: 0}}
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {"_id": 1, "x": 1, "c": []},
+ {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
+ {"_id": 3, "x": 3, "c": []}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Multiple variables.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id", var2: "$x"},
+ pipeline: [
+ {
+ $project: {
+ isMatch: {$gt: ["$$var1", "$_id"]},
+ var2Times2: {$multiply: [2, "$$var2"]}
+ }
+ },
+ {$match: {isMatch: true}},
+ {$project: {isMatch: 0}}
+ ],
+ from: "from",
+ as: "c",
+ },
+ },
+ {$project: {x: 1, c: 1}}
+ ];
+
+ expectedResults = [
+ {"_id": 1, x: 1, "c": []},
+ {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]},
+ {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Let var as complex expression object.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: {$mod: ["$x", 3]}},
+ pipeline: [
+ {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}},
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {
+ "_id": 1,
+ x: 1,
+ "c": [
+ {_id: 1, var1Mod3TimesForeignId: 1},
+ {_id: 2, var1Mod3TimesForeignId: 2},
+ {_id: 3, var1Mod3TimesForeignId: 3}
+ ]
+ },
+ {
+ "_id": 2,
+ x: 2,
+ "c": [
+ {_id: 1, var1Mod3TimesForeignId: 2},
+ {_id: 2, var1Mod3TimesForeignId: 4},
+ {_id: 3, var1Mod3TimesForeignId: 6}
+ ]
+ },
+ {
+ "_id": 3,
+ x: 3,
+ "c": [
+ {_id: 1, var1Mod3TimesForeignId: 0},
+ {_id: 2, var1Mod3TimesForeignId: 0},
+ {_id: 3, var1Mod3TimesForeignId: 0}
+ ]
+ }
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' defined variables are available to all nested sub-pipelines.
+ pipeline = [
+ {$match: {_id: 1}},
+ {
+ $lookup: {
+ let : {var1: "ABC", var2: "123"},
+ pipeline: [
+ {$match: {_id: 1}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 2}},
+ {$addFields: {letVar1: "$$var1"}},
+ {
+ $lookup: {
+ let : {var3: "XYZ"},
+ pipeline: [{
+ $addFields: {
+ mergedLetVars:
+ {$concat: ["$$var1", "$$var2", "$$var3"]}
+ }
+ }],
+ from: "from",
+ as: "join3"
+ }
+ },
+ ],
+ from: "from",
+ as: "join2"
+ }
+ },
+ ],
+ from: "from",
+ as: "join1",
+ }
+ }
+ ];
+
+ expectedResults = [{
+ "_id": 1,
+ "x": 1,
+ "join1": [{
+ "_id": 1,
+ "join2": [{
+ "_id": 2,
+ "letVar1": "ABC",
+ "join3": [
+ {"_id": 1, "mergedLetVars": "ABC123XYZ"},
+ {"_id": 2, "mergedLetVars": "ABC123XYZ"},
+ {"_id": 3, "mergedLetVars": "ABC123XYZ"}
+ ]
+ }]
+ }]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' variable shadowed by foreign pipeline variable.
+ pipeline = [
+ {$match: {_id: 2}},
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {
+ $project: {
+ shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}},
+ originalVar: "$$var1"
+ }
+ },
+ {
+ $lookup: {
+ pipeline: [{
+ $project: {
+ shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}},
+ originalVar: "$$var1"
+ }
+ }],
+ from: "from",
+ as: "d"
+ }
+ }
+ ],
+ from: "from",
+ as: "c",
+ }
+ }
+ ];
+
+ expectedResults = [{
+ "_id": 2,
+ "x": 2,
+ "c": [
+ {
+ "_id": 1,
+ "shadowedVar": "abc",
+ "originalVar": 2,
+ "d": [
+ {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
+ ]
+ },
+ {
+ "_id": 2,
+ "shadowedVar": "abc",
+ "originalVar": 2,
+ "d": [
+ {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
+ ]
+ },
+ {
+ "_id": 3,
+ "shadowedVar": "abc",
+ "originalVar": 2,
+ "d": [
+ {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
+ ]
+ }
+ ]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Use of undefined variable fails.
+ assertErrorCode(coll,
+ [{
+ $lookup: {
+ from: "from",
+ as: "as",
+ let : {var1: "$x"},
+ pipeline: [{$project: {myVar: "$$nonExistent"}}]
+ }
+ }],
+ 17276);
+
+ // The dotted path offset of a non-object variable is equivalent referencing an undefined
+ // field.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {
+ $match: {
+ $expr:
+ {$eq: ["FIELD-IS-NULL", {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]}]}
+ }
+ },
+ ],
+ from: "from",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}},
+ {$sort: {x: 1}}
+ ];
+
+ expectedResults = [
+ {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
+ {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
+ {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Comparison where a 'let' variable references an array.
+ coll.drop();
+ assert.writeOK(coll.insert({x: [1, 2, 3]}));
+
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}},
+ ],
+ from: "from",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Pipeline syntax with nested object.
+ //
+ coll.drop();
+ assert.writeOK(coll.insert({x: {y: {z: 10}}}));
+
+ // Subfields of 'let' variables can be referenced via dotted path.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {$project: {z: "$$var1.y.z"}},
+ ],
+ from: "from",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{
+ "x": {"y": {"z": 10}},
+ "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' variable with dotted field path off of $$ROOT.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$$ROOT.x.y.z"},
+ pipeline:
+ [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}],
+ from: "lookUp",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' variable with dotted field path off of $$CURRENT.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$$CURRENT.x.y.z"},
+ pipeline:
+ [{$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}}, {$project: {_id: 0}}],
+ from: "lookUp",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Pipeline syntax with nested $lookup.
+ //
+ coll.drop();
+ assert.writeOK(coll.insert({_id: 1, w: 1}));
+ assert.writeOK(coll.insert({_id: 2, w: 2}));
+ assert.writeOK(coll.insert({_id: 3, w: 3}));
+
+ from.drop();
+ assert.writeOK(from.insert({_id: 1, x: 1}));
+ assert.writeOK(from.insert({_id: 2, x: 2}));
+ assert.writeOK(from.insert({_id: 3, x: 3}));
+
+ thirdColl.drop();
+ assert.writeOK(thirdColl.insert({_id: 1, y: 1}));
+ assert.writeOK(thirdColl.insert({_id: 2, y: 2}));
+ assert.writeOK(thirdColl.insert({_id: 3, y: 3}));
+
+ fourthColl.drop();
+ assert.writeOK(fourthColl.insert({_id: 1, z: 1}));
+ assert.writeOK(fourthColl.insert({_id: 2, z: 2}));
+ assert.writeOK(fourthColl.insert({_id: 3, z: 3}));
+
+ // Nested $lookup pipeline.
+ pipeline = [
+ {$match: {_id: 1}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 2}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 3}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 1}},
+ ],
+ from: "fourthColl",
+ as: "thirdLookup"
+ }
+ },
+ ],
+ from: "thirdColl",
+ as: "secondLookup"
+ }
+ },
+ ],
+ from: "from",
+ as: "firstLookup",
+ }
+ }
+ ];
+
+ expectedResults = [{
+ "_id": 1,
+ "w": 1,
+ "firstLookup": [{
+ "_id": 2,
+ x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}]
+ }]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested
+ // $lookup sub-pipelines up to the maximum depth, but not beyond.
+ let nestedPipeline = generateNestedPipeline("lookup", 20);
+ assert.commandWorked(
+ coll.getDB().runCommand({aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}}));
+
+ nestedPipeline = generateNestedPipeline("lookup", 21);
+ assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded);
+
+ // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose
+ // combined nesting depth exceeds the limit.
+ nestedPipeline = generateNestedPipeline("lookup", 10);
+ coll.getDB().view1.drop();
+ assert.commandWorked(
+ coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline}));
+
+ nestedPipeline = generateNestedPipeline("view1", 10);
+ coll.getDB().view2.drop();
+ assert.commandWorked(
+ coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline}));
+
+ // Confirm that a composite sub-pipeline depth of 20 is allowed.
+ assert.commandWorked(coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}}));
+
+ const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1);
+ coll.getDB().view3.drop();
+ assert.commandWorked(coll.getDB().runCommand(
+ {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit}));
+
+ //
+ // Error cases.
+ //
+
+ // Confirm that a composite sub-pipeline depth greater than 20 fails.
+ assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded);
+
+ // 'pipeline' and 'let' must be of expected type.
+ assertErrorCode(
+ coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
+ assertErrorCode(
+ coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
+ assertErrorCode(coll,
+ [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+}());
diff --git a/jstests/sharding/collation_lookup.js b/jstests/sharding/collation_lookup.js
new file mode 100644
index 00000000000..f06e92ab3fc
--- /dev/null
+++ b/jstests/sharding/collation_lookup.js
@@ -0,0 +1,454 @@
+/**
+ * Tests that the $lookup stage respects the collation when the local and/or foreign collections
+ * are sharded.
+ *
+ * The comparison of string values between the 'localField' and 'foreignField' should use the
+ * collation either explicitly set on the aggregation operation, or the collation inherited from the
+ * collection the "aggregate" command was performed on.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // for arrayEq
+
+ function runTests(withDefaultCollationColl, withoutDefaultCollationColl, collation) {
+ // Test that the $lookup stage respects the inherited collation.
+ let res = withDefaultCollationColl
+ .aggregate([{
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ }])
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
+ assert(arrayEq(expected, res[0].matched),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) +
+ " up to ordering");
+
+ res = withDefaultCollationColl
+ .aggregate([{
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ }
+ ],
+ as: "matched1",
+ },
+ }])
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ },
+ {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2":
+ [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ }
+ ];
+ assert(arrayEq(expected, res[0].matched1),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
+ " up to ordering. " + tojson(res));
+
+ // Test that the $lookup stage respects the inherited collation when it optimizes with an
+ // $unwind stage.
+ res = withDefaultCollationColl
+ .aggregate([
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ {$unwind: "$matched"},
+ ])
+ .toArray();
+ assert.eq(2, res.length, tojson(res));
+
+ expected = [
+ {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
+ {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ res = withDefaultCollationColl
+ .aggregate([
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ },
+ {$unwind: "$matched2"},
+ ],
+ as: "matched1",
+ },
+ },
+ {$unwind: "$matched1"},
+ ])
+ .toArray();
+ assert.eq(4, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ }
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ // Test that the $lookup stage respects an explicit collation on the aggregation operation.
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ ],
+ collation)
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
+ assert(arrayEq(expected, res[0].matched),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) +
+ " up to ordering");
+
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ }
+ ],
+ as: "matched1",
+ },
+ }
+ ],
+ collation)
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ },
+ {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2":
+ [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ }
+ ];
+ assert(arrayEq(expected, res[0].matched1),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
+ " up to ordering");
+
+ // Test that the $lookup stage respects an explicit collation on the aggregation operation
+ // when
+ // it optimizes with an $unwind stage.
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ {$unwind: "$matched"},
+ ],
+ collation)
+ .toArray();
+ assert.eq(2, res.length, tojson(res));
+
+ expected = [
+ {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
+ {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ },
+ {$unwind: "$matched2"},
+ ],
+ as: "matched1",
+ },
+ },
+ {$unwind: "$matched1"},
+ ],
+ collation)
+ .toArray();
+ assert.eq(4, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ }
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the
+ // collection or the aggregation operation.
+ res = withoutDefaultCollationColl
+ .aggregate([
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ ])
+ .toArray();
+ assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res);
+
+ res = withoutDefaultCollationColl
+ .aggregate([
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ },
+ {$unwind: "$matched2"},
+ ],
+ as: "matched1",
+ },
+ },
+ ])
+ .toArray();
+ assert.eq([{
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": [{
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }]
+ }],
+ res);
+ }
+
+ const st = new ShardingTest({shards: 2, config: 1});
+ const testName = "collation_lookup";
+ const caseInsensitive = {collation: {locale: "en_US", strength: 2}};
+
+ const mongosDB = st.s0.getDB(testName);
+ const withDefaultCollationColl = mongosDB[testName + "_with_default"];
+ const withoutDefaultCollationColl = mongosDB[testName + "_without_default"];
+
+ assert.commandWorked(
+ mongosDB.createCollection(withDefaultCollationColl.getName(), caseInsensitive));
+ assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
+
+ assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
+ assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"}));
+ assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"}));
+
+ //
+ // Sharded collection with default collation and unsharded collection without a default
+ // collation.
+ //
+ assert.commandWorked(
+ withDefaultCollationColl.createIndex({str: 1}, {collation: {locale: "simple"}}));
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ // Shard the collection with a default collation.
+ assert.commandWorked(mongosDB.adminCommand({
+ shardCollection: withDefaultCollationColl.getFullName(),
+ key: {str: 1},
+ collation: {locale: "simple"}
+ }));
+
+ // Split the collection into 2 chunks.
+ assert.commandWorked(mongosDB.adminCommand(
+ {split: withDefaultCollationColl.getFullName(), middle: {str: "abc"}}));
+
+ // Move the chunk containing {str: "abc"} to shard0001.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: withDefaultCollationColl.getFullName(),
+ find: {str: "abc"},
+ to: st.shard1.shardName
+ }));
+
+ runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive);
+
+ // TODO: Enable the following tests once SERVER-32536 is fixed.
+ //
+ // Sharded collection with default collation and sharded collection without a default
+ // collation.
+ //
+
+ // Shard the collection without a default collation.
+ // assert.commandWorked(mongosDB.adminCommand({
+ // shardCollection: withoutDefaultCollationColl.getFullName(),
+ // key: {_id: 1},
+ // }));
+
+ // // Split the collection into 2 chunks.
+ // assert.commandWorked(mongosDB.adminCommand(
+ // {split: withoutDefaultCollationColl.getFullName(), middle: {_id: "unmatched"}}));
+
+ // // Move the chunk containing {_id: "lowercase"} to shard0001.
+ // assert.commandWorked(mongosDB.adminCommand({
+ // moveChunk: withoutDefaultCollationColl.getFullName(),
+ // find: {_id: "lowercase"},
+ // to: st.shard1.shardName
+ // }));
+
+ // runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup.js b/jstests/sharding/lookup.js
new file mode 100644
index 00000000000..7bb1bca0be5
--- /dev/null
+++ b/jstests/sharding/lookup.js
@@ -0,0 +1,618 @@
+// Basic $lookup regression tests.
+
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+ load("jstests/libs/fixture_helpers.js"); // For isSharded.
+
+ const st = new ShardingTest({shards: 2, config: 1, mongos: 1});
+ const testName = "lookup_sharded";
+
+ const mongosDB = st.s0.getDB(testName);
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Used by testPipeline to sort result documents. All _ids must be primitives.
+ function compareId(a, b) {
+ if (a._id < b._id) {
+ return -1;
+ }
+ if (a._id > b._id) {
+ return 1;
+ }
+ return 0;
+ }
+
+ // Helper for testing that pipeline returns correct set of results.
+ function testPipeline(pipeline, expectedResult, collection) {
+ assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
+ expectedResult.sort(compareId));
+ }
+
+ function runTest(coll, from, thirdColl, fourthColl) {
+ let db = null; // Using the db variable is banned in this function.
+
+ assert.commandWorked(coll.remove({}));
+ assert.commandWorked(from.remove({}));
+ assert.commandWorked(thirdColl.remove({}));
+ assert.commandWorked(fourthColl.remove({}));
+
+ assert.writeOK(coll.insert({_id: 0, a: 1}));
+ assert.writeOK(coll.insert({_id: 1, a: null}));
+ assert.writeOK(coll.insert({_id: 2}));
+
+ assert.writeOK(from.insert({_id: 0, b: 1}));
+ assert.writeOK(from.insert({_id: 1, b: null}));
+ assert.writeOK(from.insert({_id: 2}));
+
+ //
+ // Basic functionality.
+ //
+
+ // "from" document added to "as" field if a == b, where nonexistent fields are treated as
+ // null.
+ let expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If localField is nonexistent, it is treated as if it is null.
+ expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If foreignField is nonexistent, it is treated as if it is null.
+ expectedResults = [
+ {_id: 0, a: 1, "same": []},
+ {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If there are no matches or the from coll doesn't exist, the result is an empty array.
+ expectedResults =
+ [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}];
+ testPipeline(
+ [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+ testPipeline(
+ [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If field name specified by "as" already exists, it is overwritten.
+ expectedResults = [
+ {_id: 0, "a": [{_id: 0, b: 1}]},
+ {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}],
+ expectedResults,
+ coll);
+
+ // Running multiple $lookups in the same pipeline is allowed.
+ expectedResults = [
+ {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]},
+ {
+ _id: 1,
+ a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]
+ },
+ {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}},
+ {$project: {"a": 1, "c": 1}},
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}}
+ ],
+ expectedResults,
+ coll);
+
+ //
+ // Coalescing with $unwind.
+ //
+
+ // A normal $unwind with on the "as" field.
+ expectedResults = [
+ {_id: 0, a: 1, same: {_id: 0, b: 1}},
+ {_id: 1, a: null, same: {_id: 1, b: null}},
+ {_id: 1, a: null, same: {_id: 2}},
+ {_id: 2, same: {_id: 1, b: null}},
+ {_id: 2, same: {_id: 2}}
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
+ {$unwind: {path: "$same"}}
+ ],
+ expectedResults,
+ coll);
+
+ // An $unwind on the "as" field, with includeArrayIndex.
+ expectedResults = [
+ {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)},
+ {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)},
+ {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)},
+ {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)},
+ {_id: 2, same: {_id: 2}, index: NumberLong(1)},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
+ {$unwind: {path: "$same", includeArrayIndex: "index"}}
+ ],
+ expectedResults,
+ coll);
+
+ // Normal $unwind with no matching documents.
+ expectedResults = [];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
+ {$unwind: {path: "$same"}}
+ ],
+ expectedResults,
+ coll);
+
+ // $unwind with preserveNullAndEmptyArray with no matching documents.
+ expectedResults = [
+ {_id: 0, a: 1},
+ {_id: 1, a: null},
+ {_id: 2},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
+ {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
+ ],
+ expectedResults,
+ coll);
+
+ // $unwind with preserveNullAndEmptyArray, some with matching documents, some without.
+ expectedResults = [
+ {_id: 0, a: 1},
+ {_id: 1, a: null, same: {_id: 0, b: 1}},
+ {_id: 2},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
+ {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
+ ],
+ expectedResults,
+ coll);
+
+ // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching
+ // documents, some without.
+ expectedResults = [
+ {_id: 0, a: 1, index: null},
+ {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)},
+ {_id: 2, index: null},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
+ {
+ $unwind:
+ {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"}
+ }
+ ],
+ expectedResults,
+ coll);
+
+ //
+ // Dependencies.
+ //
+
+ // If $lookup didn't add "localField" to its dependencies, this test would fail as the
+ // value of the "a" field would be lost and treated as null.
+ expectedResults = [
+ {_id: 0, "same": [{_id: 0, b: 1}]},
+ {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
+ {$project: {"same": 1}}
+ ],
+ expectedResults,
+ coll);
+
+ // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test
+ // would fail as the value of the "a" field would be lost and treated as null.
+ expectedResults = [
+ {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]},
+ {
+ "_id": 1,
+ "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}]
+ },
+ {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]}
+ ];
+ testPipeline(
+ [
+ {
+ $lookup: {
+ let : {var1: "$a"},
+ pipeline: [{$project: {x: "$$var1"}}],
+ from: "from",
+ as: "same"
+ }
+ },
+ {$project: {"same": 1}}
+ ],
+ expectedResults,
+ coll);
+
+ //
+ // Dotted field paths.
+ //
+
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: 1}));
+ assert.writeOK(coll.insert({_id: 1, a: null}));
+ assert.writeOK(coll.insert({_id: 2}));
+ assert.writeOK(coll.insert({_id: 3, a: {c: 1}}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0, b: 1}));
+ assert.writeOK(from.insert({_id: 1, b: null}));
+ assert.writeOK(from.insert({_id: 2}));
+ assert.writeOK(from.insert({_id: 3, b: {c: 1}}));
+ assert.writeOK(from.insert({_id: 4, b: {c: 2}}));
+
+ // Once without a dotted field.
+ let pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}];
+ expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Look up a dotted field.
+ pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}];
+ // All but the last document in 'coll' have a nullish value for 'a.c'.
+ expectedResults = [
+ {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // With an $unwind stage.
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: {b: 1}}));
+ assert.writeOK(coll.insert({_id: 1}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0, target: 1}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a.b",
+ foreignField: "target",
+ from: "from",
+ as: "same.documents",
+ }
+ },
+ {
+ // Expected input to $unwind:
+ // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}}
+ // {_id: 1, same: {documents: []}}
+ $unwind: {
+ path: "$same.documents",
+ preserveNullAndEmptyArrays: true,
+ includeArrayIndex: "c.d.e",
+ }
+ }
+ ];
+ expectedResults = [
+ {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}},
+ {_id: 1, same: {}, c: {d: {e: null}}},
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Query-like local fields (SERVER-21287)
+ //
+
+ // This must only do an equality match rather than treating the value as a regex.
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: /a regex/}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0, b: /a regex/}));
+ assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "b",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // A local value of an array.
+ //
+
+ // Basic array corresponding to multiple documents.
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0}));
+ assert.writeOK(from.insert({_id: 1}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "_id",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Basic array corresponding to a single document.
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: [1]}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0}));
+ assert.writeOK(from.insert({_id: 1}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "_id",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Array containing regular expressions.
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]}));
+ assert.writeOK(coll.insert({_id: 1, a: [/^x/]}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0, b: "should not match a regex"}));
+ assert.writeOK(from.insert({_id: 1, b: "xxxx"}));
+ assert.writeOK(from.insert({_id: 2, b: /a regex/}));
+ assert.writeOK(from.insert({_id: 3, b: /^x/}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "b",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [
+ {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]},
+ {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'localField' references a field within an array of sub-objects.
+ assert.commandWorked(coll.remove({}));
+ assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]}));
+
+ assert.commandWorked(from.remove({}));
+ assert.writeOK(from.insert({_id: 0}));
+ assert.writeOK(from.insert({_id: 1}));
+ assert.writeOK(from.insert({_id: 2}));
+ assert.writeOK(from.insert({_id: 3}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a.b",
+ foreignField: "_id",
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Test $lookup when the foreign collection is a view.
+ //
+ // TODO SERVER-32548: Allow this test to run when the foreign collection is sharded.
+ if (!FixtureHelpers.isSharded(from)) {
+ assert.commandWorked(
+ coll.getDB().runCommand({create: "fromView", viewOn: "from", pipeline: []}));
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a.b",
+ foreignField: "_id",
+ from: "fromView",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults =
+ [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}];
+ testPipeline(pipeline, expectedResults, coll);
+ }
+
+ //
+ // Error cases.
+ //
+
+ // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with
+ // localField/foreignField syntax.
+ assertErrorCode(coll,
+ [{$lookup: {foreignField: "b", from: "from", as: "same"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", from: "from", as: "same"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", as: "same"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: "from"}}],
+ ErrorCodes.FailedToParse);
+
+ // localField/foreignField and pipeline/let syntax must not be mixed.
+ assertErrorCode(coll,
+ [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(
+ coll,
+ [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(
+ coll,
+ [{
+ $lookup:
+ {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"}
+ }],
+ ErrorCodes.FailedToParse);
+
+ // 'from', 'as', 'localField' and 'foreignField' must all be of type string.
+ assertErrorCode(coll,
+ [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}],
+ ErrorCodes.FailedToParse);
+
+ // The foreign collection must be a valid namespace.
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}],
+ ErrorCodes.InvalidNamespace);
+ // $lookup's field must be an object.
+ assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse);
+ }
+
+ //
+ // Test unsharded local collection and unsharded foreign collection.
+ //
+ mongosDB.lookUp.drop();
+ mongosDB.from.drop();
+ mongosDB.thirdColl.drop();
+ mongosDB.fourthColl.drop();
+
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ // Verify that the command is sent only to the primary shard when both the local and foreign
+ // collections are unsharded.
+ assert(!assert
+ .commandWorked(mongosDB.lookup.explain().aggregate([{
+ $lookup: {
+ from: mongosDB.from.getName(),
+ localField: "a",
+ foreignField: "b",
+ as: "results"
+ }
+ }]))
+ .hasOwnProperty("shards"));
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ //
+ // Test unsharded local collection and sharded foreign collection.
+ //
+
+ // Shard the foreign collection on _id.
+ st.shardColl(mongosDB.from, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ //
+ // Test sharded local collection and unsharded foreign collection.
+ //
+ assert(mongosDB.from.drop());
+
+ // Shard the local collection on _id.
+ st.shardColl(mongosDB.lookup, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ //
+ // Test sharded local and foreign collections.
+ //
+
+ // Shard the foreign collection on _id.
+ st.shardColl(mongosDB.from, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ // Test that a $lookup from an unsharded collection followed by an $out to a sharded collection
+ // is allowed.
+ const sourceColl = st.getDB(testName).lookUp;
+ assert(sourceColl.drop());
+ assert(st.adminCommand({shardCollection: sourceColl.getFullName(), key: {_id: "hashed"}}));
+ assert.commandWorked(sourceColl.insert({_id: 0, a: 0}));
+
+ const outColl = st.getDB(testName).out;
+ assert(outColl.drop());
+ assert(st.adminCommand({shardCollection: outColl.getFullName(), key: {_id: "hashed"}}));
+
+ const fromColl = st.getDB(testName).from;
+ assert(fromColl.drop());
+ assert.commandWorked(fromColl.insert({_id: 0, b: 0}));
+
+ sourceColl.aggregate([
+ {$lookup: {localField: "a", foreignField: "b", from: fromColl.getName(), as: "same"}},
+ {$out: {to: outColl.getName(), mode: "insertDocuments"}}
+ ]);
+
+ assert.eq([{a: 0, same: [{_id: 0, b: 0}]}], outColl.find({}, {_id: 0}).toArray());
+
+ st.stop();
+}());
diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js
new file mode 100644
index 00000000000..0c6072f8095
--- /dev/null
+++ b/jstests/sharding/lookup_mongod_unaware.js
@@ -0,0 +1,168 @@
+// Tests the behavior of a $lookup when a shard contains incorrect routing information for the
+// local and/or foreign collections. This includes when the shard thinks the collection is sharded
+// when it's not, and likewise when it thinks the collection is unsharded but is actually sharded.
+//
+// We restart a mongod to cause it to forget that a collection was sharded. When restarted, we
+// expect it to still have all the previous data.
+// @tags: [requires_persistence]
+(function() {
+ "use strict";
+
+ // Restarts the primary shard and ensures that it believes both collections are unsharded.
+ function restartPrimaryShard(rs, localColl, foreignColl) {
+ // Returns true if the shard is aware that the collection is sharded.
+ function hasRoutingInfoForNs(shardConn, coll) {
+ const res = shardConn.adminCommand({getShardVersion: coll, fullMetadata: true});
+ assert.commandWorked(res);
+ return res.metadata.collVersion != undefined;
+ }
+
+ rs.restart(0);
+ rs.awaitSecondaryNodes();
+ assert(!hasRoutingInfoForNs(rs.getPrimary(), localColl.getFullName()));
+ assert(!hasRoutingInfoForNs(rs.getPrimary(), foreignColl.getFullName()));
+ }
+
+ const testName = "lookup_stale_mongod";
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 2,
+ rs: {nodes: 1},
+ });
+
+ const mongos0DB = st.s0.getDB(testName);
+ const mongos0LocalColl = mongos0DB[testName + "_local"];
+ const mongos0ForeignColl = mongos0DB[testName + "_foreign"];
+
+ const mongos1DB = st.s1.getDB(testName);
+ const mongos1LocalColl = mongos1DB[testName + "_local"];
+ const mongos1ForeignColl = mongos1DB[testName + "_foreign"];
+
+ const pipeline = [
+ {
+ $lookup:
+ {localField: "a", foreignField: "b", from: mongos0ForeignColl.getName(), as: "same"}
+ },
+ {$sort: {_id: 1}}
+ ];
+
+ // The results are expected to be correct if the $lookup stage is executed on the mongos which
+ // is aware that the collection is sharded.
+ const expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+
+ // Ensure that shard0 is the primary shard.
+ assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()}));
+ st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName);
+
+ assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null}));
+
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+
+ // Send writes through mongos1 such that it's aware of the collections and believes they are
+ // unsharded.
+ assert.writeOK(mongos1LocalColl.insert({_id: 2}));
+ assert.writeOK(mongos1ForeignColl.insert({_id: 2}));
+
+ //
+ // Test unsharded local and sharded foreign collections, with the primary shard unaware that
+ // the foreign collection is sharded.
+ //
+
+ // Shard the foreign collection.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0ForeignColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Verify $lookup results through the fresh mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ // Verify $lookup results through mongos1, which is not aware that the local
+ // collection is sharded. The results are expected to be incorrect when both the mongos and
+ // primary shard incorrectly believe that a collection is unsharded.
+ // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file
+ // sent to the stale mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
+ {_id: 0, a: 1, "same": []},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ]);
+
+ //
+ // Test sharded local and sharded foreign collections, with the primary shard unaware that
+ // either collection is sharded.
+ //
+
+ // Shard the local collection.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0LocalColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Verify $lookup results through the fresh mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ // Verify $lookup results through mongos1, which is not aware that the local
+ // collection is sharded. The results are expected to be incorrect when both the mongos and
+ // primary shard incorrectly believe that a collection is unsharded.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ]);
+
+ //
+ // Test sharded local and unsharded foreign collections, with the primary shard unaware that
+ // the local collection is sharded.
+ //
+
+ // Recreate the foreign collection as unsharded.
+ mongos0ForeignColl.drop();
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 2}));
+
+ // Verify $lookup results through the fresh mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ // Verify $lookup results through mongos1, which is not aware that the local
+ // collection is sharded. The results are expected to be incorrect when both the mongos and
+ // primary shard incorrectly believe that a collection is unsharded.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ]);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup_stale_mongos.js b/jstests/sharding/lookup_stale_mongos.js
new file mode 100644
index 00000000000..0dc17958e26
--- /dev/null
+++ b/jstests/sharding/lookup_stale_mongos.js
@@ -0,0 +1,130 @@
+// Tests the behavior of a $lookup when the mongos contains stale routing information for the
+// local and/or foreign collections. This includes when mongos thinks the collection is sharded
+// when it's not, and likewise when mongos thinks the collection is unsharded but is actually
+// sharded.
+(function() {
+ "use strict";
+
+ const testName = "lookup_stale_mongos";
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 2,
+ });
+
+ const mongos0DB = st.s0.getDB(testName);
+ assert.commandWorked(mongos0DB.dropDatabase());
+ const mongos0LocalColl = mongos0DB[testName + "_local"];
+ const mongos0ForeignColl = mongos0DB[testName + "_foreign"];
+
+ const mongos1DB = st.s1.getDB(testName);
+ const mongos1LocalColl = mongos1DB[testName + "_local"];
+ const mongos1ForeignColl = mongos1DB[testName + "_foreign"];
+
+ const pipeline = [
+ {
+ $lookup:
+ {localField: "a", foreignField: "b", from: mongos1ForeignColl.getName(), as: "same"}
+ },
+ {$sort: {_id: 1}}
+ ];
+ const expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+
+ // Ensure that shard0 is the primary shard.
+ assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()}));
+ st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName);
+
+ assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null}));
+
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+
+ // Send writes through mongos1 such that it's aware of the collections and believes they are
+ // unsharded.
+ assert.writeOK(mongos1LocalColl.insert({_id: 2}));
+ assert.writeOK(mongos1ForeignColl.insert({_id: 2}));
+
+ //
+ // Test unsharded local and sharded foreign collections, with mongos unaware that the foreign
+ // collection is sharded.
+ //
+
+ // Shard the foreign collection through mongos0.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0ForeignColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Issue a $lookup through mongos1, which is unaware that the foreign collection is sharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ //
+ // Test sharded local and sharded foreign collections, with mongos unaware that the local
+ // collection is sharded.
+ //
+
+ // Shard the local collection through mongos0.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0LocalColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Issue a $lookup through mongos1, which is unaware that the local collection is sharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ //
+ // Test sharded local and unsharded foreign collections, with mongos unaware that the foreign
+ // collection is unsharded.
+ //
+
+ // Recreate the foreign collection as unsharded through mongos0.
+ mongos0ForeignColl.drop();
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 2}));
+
+ // Issue a $lookup through mongos1, which is unaware that the foreign collection is now
+ // unsharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ //
+ // Test unsharded local and foreign collections, with mongos unaware that the local
+ // collection is unsharded.
+ //
+
+ // Recreate the local collection as unsharded through mongos0.
+ mongos0LocalColl.drop();
+ assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 2}));
+
+ // Issue a $lookup through mongos1, which is unaware that the local collection is now
+ // unsharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ st.stop();
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 2f1c2e27cca..71270f0e72f 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -320,7 +320,10 @@ env.Library(
'mongos_process_interface.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/s/query/async_results_merger',
+ '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
+ '$BUILD_DIR/mongo/s/query/cluster_query',
'mongo_process_common',
]
)
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
index 17622131061..737c316c9bf 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -246,8 +246,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
// with the resume token.
auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
auto matchSpec = BSON("$match" << BSONObj());
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
+ auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx);
if (auto first = pipeline->getNext()) {
auto firstOplogEntry = Value(*first);
uassert(40576,
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index a7b0567b66b..a94179c1431 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -458,34 +458,31 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index d93535dc030..1b6583bb655 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -207,8 +207,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
// We've already allocated space for the trailing $match stage in '_fromPipeline'.
_fromPipeline.back() = *matchStage;
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx));
+ auto pipeline =
+ pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx);
while (auto next = pipeline->getNext()) {
uassert(40271,
str::stream()
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 6309f89597a..47f37939f73 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -63,30 +63,27 @@ public:
MockMongoInterface(std::deque<DocumentSource::GetNextResult> results)
: _results(std::move(results)) {}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 04094a1d45b..7223f854521 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -196,9 +196,16 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const {
txnRequirement = resolvedRequirements.second;
}
+ // If executing on mongos and the foreign collection is sharded, then this stage can run on
+ // mongos.
+ HostTypeRequirement hostRequirement =
+ (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs))
+ ? HostTypeRequirement::kMongoS
+ : HostTypeRequirement::kPrimaryShard;
+
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- HostTypeRequirement::kPrimaryShard,
+ hostRequirement,
diskRequirement,
FacetRequirement::kAllowed,
txnRequirement);
@@ -289,8 +296,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
// If we don't have a cache, build and return the pipeline immediately.
if (!_cache || _cache->isAbandoned()) {
- return uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx));
+ return pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx);
}
// Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a
@@ -300,8 +306,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
pipelineOpts.attachCursorSource = false;
// Construct the basic pipeline without a cache stage.
- auto pipeline = uassertStatusOK(
- pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts));
+ auto pipeline =
+ pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
// Add the cache stage at the end and optimize. During the optimization process, the cache will
// either move itself to the correct position in the pipeline, or will abandon itself if no
@@ -313,8 +319,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- _fromExpCtx, pipeline.get()));
+ pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx,
+ pipeline.release());
}
// If the cache has been abandoned, release it.
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index d17b1a73c9c..2a2b3338789 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -81,13 +81,6 @@ public:
return requiredPrivileges;
}
- /**
- * Lookup from a sharded collection is not allowed.
- */
- bool allowShardedForeignCollection(NamespaceString nss) const final {
- return (_foreignNssSet.find(nss) == _foreignNssSet.end());
- }
-
private:
const NamespaceString _fromNss;
const stdx::unordered_set<NamespaceString> _foreignNssSet;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 148197d4d5b..9c3dbf5777b 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -85,34 +85,31 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
boost::optional<Document> lookupSingleDocument(
@@ -125,11 +122,12 @@ public:
// case of a change stream on a whole database so we need to make a copy of the
// ExpressionContext with the new namespace.
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none);
- auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
- if (swPipeline == ErrorCodes::NamespaceNotFound) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ try {
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
+ } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
return boost::none;
}
- auto pipeline = uassertStatusOK(std::move(swPipeline));
auto lookedUpDocument = pipeline->getNext();
if (auto next = pipeline->getNext()) {
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index 4f461d6705c..bacfc5c6cda 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -544,28 +544,25 @@ public:
return false;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) final {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") ||
pipeline->popFrontWithName("$project")) {
@@ -575,7 +572,7 @@ public:
}
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 5f092107c1a..be934e88707 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -184,20 +184,24 @@ public:
* - If opts.attachCursorSource is false, the pipeline will be returned without attempting to
* add an initial cursor source.
*
- * This function returns a non-OK status if parsing the pipeline failed.
+ * This function throws if parsing the pipeline failed.
*/
- virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
/**
- * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
- * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
- * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
+ * Accepts a pipeline and returns a new one which will draw input from the underlying
+ * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
+ * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
* the only case where NamespaceNotFound is returned.
+ *
+ * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr.
+ * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
+ * compiler expects to find an implementation of PipelineDeleter.
*/
- virtual Status attachCursorSourceToPipeline(
+ virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 85f141c9277..6336910029c 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -28,10 +28,13 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/index_descriptor.h"
@@ -45,17 +48,109 @@
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
+#include "mongo/s/transaction_router.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
+
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
+// Given a document representing an aggregation command such as
+//
+// {aggregate: "myCollection", pipeline: [], ...},
+//
+// produces the corresponding explain command:
+//
+// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
+ MutableDocument explainCommandBuilder;
+ explainCommandBuilder["explain"] = Value(aggregateCommand);
+ // Downstream host targeting code expects queryOptions at the top level of the command object.
+ explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
+ Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
+
+ // readConcern needs to be promoted to the top-level of the request.
+ explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
+ Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
+
+ // Add explain command options.
+ for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
+ explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
+ }
+
+ return explainCommandBuilder.freeze();
+}
+
+std::vector<RemoteCursor> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj& cmdObj,
+ const AggregationRequest& request,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe);
+ std::set<ShardId> shardIds = MongoSInterface::getTargetedShards(
+ opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo || mustRunOnAll);
+
+ if (mustRunOnAll) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo->db().primaryId(),
+ !routingInfo->db().primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj);
+ }
+
+ if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ sleepsecs(1);
+ }
+ }
+
+ return establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ MongoSInterface::getDesiredRetryPolicy(request));
+}
+
/**
* Determines the single shard to which the given query will be targeted, and its associated
* shardVersion. Throws if the query targets more than one shard.
@@ -115,6 +210,379 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
+Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (req.getWriteConcern()) {
+ return Shard::RetryPolicy::kNotIdempotent;
+ }
+ return Shard::RetryPolicy::kIdempotent;
+}
+
+BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ if (pipeline) {
+ targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ }
+
+ return MongoSInterface::genericTransformForShards(
+ std::move(targetedCmd), opCtx, shardId, request, collationObj);
+}
+
+BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj) {
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
+ // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
+ // explain command.
+ if (auto explainVerbosity = request.getExplain()) {
+ cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
+ }
+
+ if (!collationObj.isEmpty()) {
+ cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
+ }
+
+ if (opCtx->getTxnNumber()) {
+ invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
+ str::stream() << "Command for shards unexpectedly had the "
+ << OperationSessionInfo::kTxnNumberFieldName
+ << " field set: "
+ << cmdForShards.peek().toString());
+ cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
+ Value(static_cast<long long>(*opCtx->getTxnNumber()));
+ }
+
+ auto aggCmd = cmdForShards.freeze().toBson();
+
+ if (shardId) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
+ }
+ }
+
+ // agg creates temp collection and should handle implicit create separately.
+ return appendAllowImplicitCreate(aggCmd, true);
+}
+
+BSONObj MongoSInterface::createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge) {
+
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
+ // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
+ // have detected a logged in user and appended that user name to the $listSessions spec to
+ // send to the shards.
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(splitPipeline.shardsPipeline->serialize());
+
+ // When running on many shards with the exchange we may not need merging.
+ if (needsMerge) {
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
+ }
+
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+
+ targetedCmd[AggregationRequest::kExchangeName] =
+ exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
+
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, boost::none, request, collationObj);
+}
+
+std::set<ShardId> MongoSInterface::getTargetedShards(
+ OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (mustRunOnAllShards) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ return {shardIds.begin(), shardIds.end()};
+ }
+
+ // If we don't need to run on all shards, then we should always have a valid routing table.
+ invariant(routingInfo);
+
+ return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
+}
+
+bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe) {
+ // The following aggregations must be routed to all shards:
+ // - Any collectionless aggregation, such as non-localOps $currentOp.
+ // - Any aggregation which begins with a $changeStream stage.
+ return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
+}
+
+StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& execNss) {
+ // First, verify that there are shards present in the cluster. If not, then we return the
+ // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
+ // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
+ // a collection before its enclosing database is created. However, if there are no shards
+ // present, then $changeStream should immediately return an empty cursor just as other
+ // aggregations do when the database does not exist.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
+ if (shardIds.size() == 0) {
+ return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
+ }
+
+ // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
+ // exist.
+ return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
+}
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
+ // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
+ // entire aggregation commmand.
+ auto cursors = std::vector<RemoteCursor>();
+ auto shardResults = std::vector<AsyncRequestsSender::Response>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const bool needsMongosMerge = pipeline->needsMongosMerger();
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
+
+ // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
+ // Otherwise, uassert on all exceptions here.
+ if (!(liteParsedPipeline.hasChangeStream() &&
+ executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
+ uassertStatusOK(executionNsRoutingInfoStatus);
+ }
+
+ auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
+ ? std::move(executionNsRoutingInfoStatus.getValue())
+ : boost::optional<CachedCollectionRoutingInfo>{};
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
+ std::set<ShardId> shardIds = getTargetedShards(
+ opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
+
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->computeAndSetAtClusterTime(
+ opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
+ }
+
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
+
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ if (needsSplit) {
+ splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
+
+ exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ opCtx, splitPipeline->mergePipeline.get());
+ }
+
+ // Generate the command object for the targeted shards.
+ BSONObj targetedCommand = splitPipeline
+ ? createCommandForTargetedShards(
+ opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
+ : createPassthroughCommandForShard(
+ opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
+
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAll) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
+ }
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAll) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and
+ // should not participate in the shard version protocol.
+ shardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db(),
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target
+ // shards, and should participate in the shard version protocol.
+ invariant(executionNsRoutingInfo);
+ shardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db(),
+ executionNss,
+ *executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation());
+ }
+ } else {
+ cursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ targetedCommand,
+ aggRequest,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery);
+ invariant(cursors.size() % shardIds.size() == 0,
+ str::stream() << "Number of cursors (" << cursors.size()
+ << ") is not a multiple of producers ("
+ << shardIds.size()
+ << ")");
+ }
+
+ // Convert remote cursors into a vector of "owned" cursors.
+ std::vector<OwnedRemoteCursor> ownedCursors;
+ for (auto&& cursor : cursors) {
+ ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
+ }
+
+ // Record the number of shards involved in the aggregation. If we are required to merge on
+ // the primary shard, but the primary shard was not in the set of targeted shards, then we
+ // must increment the number of involved shards.
+ CurOp::get(opCtx)->debug().nShards =
+ shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
+ !shardIds.count(executionNsRoutingInfo->db().primaryId()));
+
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
+ std::move(ownedCursors),
+ std::move(shardResults),
+ std::move(splitPipeline),
+ std::move(pipeline),
+ targetedCommand,
+ shardIds.size(),
+ exchangeSpec};
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) {
+ // Explain is not supported for auxiliary lookups.
+ invariant(!expCtx->explain);
+
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
+ if (pipelineOptions.optimize) {
+ pipeline->optimizePipeline();
+ }
+ if (pipelineOptions.attachCursorSource) {
+ // 'attachCursorSourceToPipeline' handles any complexity related to sharding.
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
+ }
+
+ return pipeline;
+}
+
+
+std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Generate the command object for the targeted shards.
+ std::vector<BSONObj> rawStages = [pipeline]() {
+ auto serialization = pipeline->serialize();
+ std::vector<BSONObj> stages;
+ stages.reserve(serialization.size());
+
+ for (const auto& stageObj : serialization) {
+ invariant(stageObj.getType() == BSONType::Object);
+ stages.push_back(stageObj.getDocument().toBson());
+ }
+
+ return stages;
+ }();
+
+ AggregationRequest aggRequest(expCtx->ns, rawStages);
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
+ expCtx,
+ expCtx->ns,
+ aggRequest,
+ liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)),
+ expCtx->collation);
+
+ std::vector<ShardId> targetedShards;
+ targetedShards.reserve(shardDispatchResults.remoteCursors.size());
+ for (auto&& remoteCursor : shardDispatchResults.remoteCursors) {
+ targetedShards.emplace_back(remoteCursor->getShardId().toString());
+ }
+
+ std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
+ if (shardDispatchResults.splitPipeline) {
+ mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
+ shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ } else {
+ mergePipeline = std::move(shardDispatchResults.pipelineForSingleShard);
+ }
+
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergePipeline.get(),
+ liteParsedPipeline,
+ shardDispatchResults.commandForTargetedShards,
+ std::move(shardDispatchResults.remoteCursors),
+ targetedShards,
+ shardCursorsSortSpec,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor());
+
+ return mergePipeline;
+}
+
boost::optional<Document> MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index f2806baf001..6111f3346d8 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -32,6 +32,10 @@
#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_aggregation_planner.h"
+#include "mongo/s/query/owned_remote_cursor.h"
namespace mongo {
@@ -41,6 +45,81 @@ namespace mongo {
*/
class MongoSInterface final : public MongoProcessCommon {
public:
+ struct DispatchShardPipelineResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on
+ // the primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<OwnedRemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The split version of the pipeline if more than one shard was targeted, otherwise
+ // boost::none.
+ boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline;
+
+ // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
+
+ // How many exchange producers are running the shard part of splitPipeline.
+ size_t numProducers;
+
+ // The exchange specification if the query can run with the exchange otherwise boost::none.
+ boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ };
+
+ static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req);
+
+ static BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
+ Pipeline* pipeline,
+ BSONObj collationObj);
+
+ /**
+ * Appends information to the command sent to the shards which should be appended both if this
+ * is a passthrough sent to a single shard and if this is a split pipeline.
+ */
+ static BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
+ const AggregationRequest& request,
+ BSONObj collationObj);
+
+ static BSONObj createCommandForTargetedShards(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const cluster_aggregation_planner::SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ bool needsMerge);
+
+ static std::set<ShardId> getTargetedShards(
+ OperationContext* opCtx,
+ bool mustRunOnAllShards,
+ const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation);
+
+ static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe);
+
+ static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& execNss);
+
+ static DispatchShardPipelineResults dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ BSONObj collationObj);
+
MongoSInterface() = default;
virtual ~MongoSInterface() = default;
@@ -119,10 +198,8 @@ public:
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- MONGO_UNREACHABLE;
- }
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final {
MONGO_UNREACHABLE;
@@ -133,12 +210,10 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final {
- MONGO_UNREACHABLE;
- }
+ const MakePipelineOptions pipelineOptions) final;
/**
* The following methods only make sense for data-bearing nodes and should never be called on
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index bf0987ea4ab..1eb3af3aa97 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -274,45 +274,35 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
_client.runCommand("admin", renameCommandObj, info));
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline(
+std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
+ auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx));
if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
+ pipeline->optimizePipeline();
}
- Status cursorStatus = Status::OK();
-
if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release());
}
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+ return pipeline;
}
-Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
+unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
boost::optional<AutoGetCollectionForReadCommand> autoColl;
if (expCtx->uuid) {
- try {
- autoColl.emplace(expCtx->opCtx,
- NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
- AutoGetCollection::ViewMode::kViewsForbidden,
- Date_t::max(),
- AutoStatsTracker::LogMode::kUpdateTop);
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
- // The UUID doesn't exist anymore
- return ex.toStatus();
- }
+ autoColl.emplace(expCtx->opCtx,
+ NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid},
+ AutoGetCollection::ViewMode::kViewsForbidden,
+ Date_t::max(),
+ AutoStatsTracker::LogMode::kUpdateTop);
} else {
autoColl.emplace(expCtx->opCtx,
expCtx->ns,
@@ -337,7 +327,7 @@ Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
// the initial cursor stage.
pipeline->optimizePipeline();
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const {
@@ -381,7 +371,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument(
nss,
collectionUUID,
_getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx));
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
}
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 12627655cd5..f1c7fbcc910 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -87,12 +87,12 @@ public:
const NamespaceString& targetNs,
const BSONObj& originalCollectionOptions,
const std::list<BSONObj>& originalIndexes) final;
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final;
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString&, UUID) const override;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 08b9e073f35..b9d7befb857 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -116,15 +116,15 @@ public:
MONGO_UNREACHABLE;
}
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) override {
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) override {
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 015ea1c66d1..d06c2e91373 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -26,10 +26,10 @@ env.Library(
target='cluster_aggregate',
source=[
'cluster_aggregate.cpp',
- 'cluster_aggregation_planner.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner',
'$BUILD_DIR/mongo/s/query/cluster_client_cursor',
'$BUILD_DIR/mongo/db/pipeline/mongos_process_interface',
'$BUILD_DIR/mongo/db/views/views',
@@ -37,6 +37,16 @@ env.Library(
]
)
+env.Library(
+ target='cluster_aggregation_planner',
+ source=[
+ 'cluster_aggregation_planner.cpp',
+ ],
+ LIBDEPS=[
+ 'cluster_query',
+ ]
+)
+
env.CppUnitTest(
target="cluster_aggregate_test",
source=[
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 9fc978b46ee..506842da29c 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -82,7 +82,6 @@ namespace mongo {
using SplitPipeline = cluster_aggregation_planner::SplitPipeline;
-MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors);
MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToEstablishMergingShardCursor);
MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToDispatchExchangeConsumerPipeline);
@@ -90,41 +89,6 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries;
namespace {
-Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) {
- // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
- // pipeline does not support writeConcern.
- if (req.getWriteConcern()) {
- return Shard::RetryPolicy::kNotIdempotent;
- }
- return Shard::RetryPolicy::kIdempotent;
-}
-
-// Given a document representing an aggregation command such as
-//
-// {aggregate: "myCollection", pipeline: [], ...},
-//
-// produces the corresponding explain command:
-//
-// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
-Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
- MutableDocument explainCommandBuilder;
- explainCommandBuilder["explain"] = Value(aggregateCommand);
- // Downstream host targeting code expects queryOptions at the top level of the command object.
- explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
- Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
-
- // readConcern needs to be promoted to the top-level of the request.
- explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
- Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
-
- // Add explain command options.
- for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
- explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
- }
-
- return explainCommandBuilder.freeze();
-}
-
Status appendCursorResponseToCommandResult(const ShardId& shardId,
const BSONObj cursorResponse,
BSONObjBuilder* result) {
@@ -138,143 +102,6 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
return getStatusFromCommandResult(result->asTempObj());
}
-bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) {
- // The following aggregations must be routed to all shards:
- // - Any collectionless aggregation, such as non-localOps $currentOp.
- // - Any aggregation which begins with a $changeStream stage.
- return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream();
-}
-
-StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
- const NamespaceString& execNss) {
- // First, verify that there are shards present in the cluster. If not, then we return the
- // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because
- // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on
- // a collection before its enclosing database is created. However, if there are no shards
- // present, then $changeStream should immediately return an empty cursor just as other
- // aggregations do when the database does not exist.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- if (shardIds.size() == 0) {
- return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"};
- }
-
- // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not
- // exist.
- return getCollectionRoutingInfoForTxnCmd(opCtx, execNss);
-}
-
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- bool mustRunOnAllShards,
- const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation) {
- if (mustRunOnAllShards) {
- // The pipeline begins with a stage which must be run on all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds);
- return {shardIds.begin(), shardIds.end()};
- }
-
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo);
-
- return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
-}
-
-/**
- * Appends information to the command sent to the shards which should be appended both if this is a
- * passthrough sent to a single shard and if this is a split pipeline.
- */
-BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
- OperationContext* opCtx,
- const boost::optional<ShardId>& shardId,
- const AggregationRequest& request,
- BSONObj collationObj) {
- cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
- // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
- // explain command.
- if (auto explainVerbosity = request.getExplain()) {
- cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
- }
-
- if (!collationObj.isEmpty()) {
- cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
- }
-
- if (opCtx->getTxnNumber()) {
- invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(),
- str::stream() << "Command for shards unexpectedly had the "
- << OperationSessionInfo::kTxnNumberFieldName
- << " field set: "
- << cmdForShards.peek().toString());
- cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
- Value(static_cast<long long>(*opCtx->getTxnNumber()));
- }
-
- auto aggCmd = cmdForShards.freeze().toBson();
-
- if (shardId) {
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd);
- }
- }
-
- // agg creates temp collection and should handle implicit create separately.
- return appendAllowImplicitCreate(aggCmd, true);
-}
-
-BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
- const AggregationRequest& request,
- const boost::optional<ShardId>& shardId,
- Pipeline* pipeline,
- BSONObj collationObj) {
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- if (pipeline) {
- targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
- }
-
- return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
-}
-
-BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const SplitPipeline& splitPipeline,
- const BSONObj collationObj,
- const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
- bool needsMerge) {
-
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
- // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
- // have detected a logged in user and appended that user name to the $listSessions spec to
- // send to the shards.
- targetedCmd[AggregationRequest::kPipelineName] =
- Value(splitPipeline.shardsPipeline->serialize());
-
- // When running on many shards with the exchange we may not need merging.
- if (needsMerge) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
-
- // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
- // part. Otherwise this is part of an exchange and in that case we should include the
- // writeConcern.
- targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
- }
-
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
-
- targetedCmd[AggregationRequest::kExchangeName] =
- exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
-
- return genericTransformForShards(
- std::move(targetedCmd), opCtx, boost::none, request, collationObj);
-}
-
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const ShardId& shardId,
@@ -302,252 +129,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
return appendAllowImplicitCreate(aggCmd, true);
}
-std::vector<RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- const BSONObj& cmdObj,
- const AggregationRequest& request,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery) {
- LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
-
- const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
- std::set<ShardId> shardIds =
- getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
- std::vector<std::pair<ShardId, BSONObj>> requests;
-
- // If we don't need to run on all shards, then we should always have a valid routing table.
- invariant(routingInfo || mustRunOnAll);
-
- if (mustRunOnAll) {
- // The pipeline contains a stage which must be run on all shards. Skip versioning and
- // enqueue the raw command objects.
- for (auto&& shardId : shardIds) {
- requests.emplace_back(std::move(shardId), cmdObj);
- }
- } else if (routingInfo->cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation, and build versioned requests for them.
- for (auto& shardId : shardIds) {
- auto versionedCmdObj =
- appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
- requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
- }
- } else {
- // The collection is unsharded. Target only the primary shard for the database.
- // Don't append shard version info when contacting the config servers.
- requests.emplace_back(routingInfo->db().primaryId(),
- !routingInfo->db().primary()->isConfig()
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
- : cmdObj);
- }
-
- if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
- "until fail point is disabled.";
- while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- sleepsecs(1);
- }
- }
-
- return establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */,
- getDesiredRetryPolicy(request));
-}
-
-struct DispatchShardPipelineResults {
- // True if this pipeline was split, and the second half of the pipeline needs to be run on the
- // primary shard for the database.
- bool needsPrimaryShardMerge;
-
- // Populated if this *is not* an explain, this vector represents the cursors on the remote
- // shards.
- std::vector<OwnedRemoteCursor> remoteCursors;
-
- // Populated if this *is* an explain, this vector represents the results from each shard.
- std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
-
- // The split version of the pipeline if more than one shard was targeted, otherwise boost::none.
- boost::optional<SplitPipeline> splitPipeline;
-
- // If the pipeline targeted a single shard, this is the pipeline to run on that shard.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard;
-
- // The command object to send to the targeted shards.
- BSONObj commandForTargetedShards;
-
- // How many exchange producers are running the shard part of splitPipeline.
- size_t numProducers;
-
- // The exchange specification if the query can run with the exchange otherwise boost::none.
- boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
-};
-
-/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
- * the pipeline that will need to be executed to merge the results from the remotes. If a stale
- * shard version is encountered, refreshes the routing table and tries again.
- */
-DispatchShardPipelineResults dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
- BSONObj collationObj) {
- // The process is as follows:
- // - First, determine whether we need to target more than one shard. If so, we split the
- // pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the
- // entire aggregation commmand.
- auto cursors = std::vector<RemoteCursor>();
- auto shardResults = std::vector<AsyncRequestsSender::Response>();
- auto opCtx = expCtx->opCtx;
-
- const bool needsPrimaryShardMerge =
- (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
-
- const bool needsMongosMerge = pipeline->needsMongosMerger();
-
- const auto shardQuery = pipeline->getInitialQuery();
-
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
-
- // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
- // Otherwise, uassert on all exceptions here.
- if (!(liteParsedPipeline.hasChangeStream() &&
- executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) {
- uassertStatusOK(executionNsRoutingInfoStatus);
- }
-
- auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK()
- ? std::move(executionNsRoutingInfoStatus.getValue())
- : boost::optional<CachedCollectionRoutingInfo>{};
-
- // Determine whether we can run the entire aggregation on a single shard.
- const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline);
- std::set<ShardId> shardIds = getTargetedShards(
- opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
-
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->computeAndSetAtClusterTime(
- opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
- }
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && executionNsRoutingInfo &&
- *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
-
- boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
- boost::optional<SplitPipeline> splitPipeline;
-
- if (needsSplit) {
- splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
-
- exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
- opCtx, splitPipeline->mergePipeline.get());
- }
-
- // Generate the command object for the targeted shards.
- BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(
- opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
- : createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
-
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAll) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- }
-
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- if (expCtx->explain) {
- if (mustRunOnAll) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and
- // should not participate in the shard version protocol.
- shardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db(),
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target
- // shards, and should participate in the shard version protocol.
- invariant(executionNsRoutingInfo);
- shardResults =
- scatterGatherVersionedTargetByRoutingTable(opCtx,
- executionNss.db(),
- executionNss,
- *executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation());
- }
- } else {
- cursors = establishShardCursors(opCtx,
- executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- targetedCommand,
- aggRequest,
- ReadPreferenceSetting::get(opCtx),
- shardQuery);
- invariant(cursors.size() % shardIds.size() == 0,
- str::stream() << "Number of cursors (" << cursors.size()
- << ") is not a multiple of producers ("
- << shardIds.size()
- << ")");
- }
-
- // Convert remote cursors into a vector of "owned" cursors.
- std::vector<OwnedRemoteCursor> ownedCursors;
- for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss));
- }
-
- // Record the number of shards involved in the aggregation. If we are required to merge on
- // the primary shard, but the primary shard was not in the set of targeted shards, then we
- // must increment the number of involved shards.
- CurOp::get(opCtx)->debug().nShards =
- shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo &&
- !shardIds.count(executionNsRoutingInfo->db().primaryId()));
-
- return DispatchShardPipelineResults{needsPrimaryShardMerge,
- std::move(ownedCursors),
- std::move(shardResults),
- std::move(splitPipeline),
- std::move(pipeline),
- targetedCommand,
- shardIds.size(),
- exchangeSpec};
-}
-
-DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
+MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObj collationObj,
- DispatchShardPipelineResults* shardDispatchResults) {
+ MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) {
invariant(!liteParsedPipeline.hasChangeStream());
auto opCtx = expCtx->opCtx;
@@ -584,7 +172,7 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
- auto consumerCmdObj = createCommandForTargetedShards(
+ auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards(
opCtx, aggRequest, consumerPipelines.back(), collationObj, boost::none, false);
requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
@@ -617,16 +205,16 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront());
mergeCursors->dismissCursorOwnership();
}
- return DispatchShardPipelineResults{false,
- std::move(ownedCursors),
- {} /*TODO SERVER-36279*/,
- std::move(splitPipeline),
- nullptr,
- BSONObj(),
- numConsumers};
+ return MongoSInterface::DispatchShardPipelineResults{false,
+ std::move(ownedCursors),
+ {} /*TODO SERVER-36279*/,
+ std::move(splitPipeline),
+ nullptr,
+ BSONObj(),
+ numConsumers};
}
-Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
+Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
BSONObjBuilder* result) {
if (dispatchResults.splitPipeline) {
@@ -688,7 +276,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request);
+ Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request);
return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts(
opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy));
}
@@ -914,36 +502,16 @@ ShardId pickMergingShard(OperationContext* opCtx,
: targetedShards[prng.nextInt32(targetedShards.size())];
}
-// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the
-// pipeline. We won't try to execute anything on a mongos, but we still have to populate this map so
-// that any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
-// incorrect, we will repopulate the real namespace map on the mongod. Note that this function must
-// be called before forwarding an aggregation command on an unsharded collection, in order to verify
-// that the involved namespaces are allowed to be sharded.
-StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces(
- OperationContext* opCtx, const LiteParsedPipeline& litePipe) {
-
- StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- for (auto&& nss : litePipe.getInvolvedNamespaces()) {
- const auto resolvedNsRoutingInfo =
- uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
- uassert(28769,
- str::stream() << nss.ns() << " cannot be sharded",
- !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss));
- resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
- }
- return resolvedNamespaces;
-}
-
-// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved
-// namespaces are unsharded, instantiates an appropriate collator, creates a MongoProcessInterface
-// for use by the pipeline's stages, and optionally extracts the UUID from the collection info if
-// present.
-boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx,
- const AggregationRequest& request,
- const LiteParsedPipeline& litePipe,
- BSONObj collationObj,
- boost::optional<UUID> uuid) {
+// Build an appropriate ExpressionContext for the pipeline. This helper instantiates an appropriate
+// collator, creates a MongoProcessInterface for use by the pipeline's stages, and optionally
+// extracts the UUID from the collection info if present.
+boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const LiteParsedPipeline& litePipe,
+ BSONObj collationObj,
+ boost::optional<UUID> uuid,
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) {
std::unique_ptr<CollatorInterface> collation;
if (!collationObj.isEmpty()) {
@@ -958,7 +526,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext*
request,
std::move(collation),
std::make_shared<MongoSInterface>(),
- resolveInvolvedNamespaces(opCtx, litePipe),
+ std::move(resolvedNamespaces),
uuid);
mergeCtx->inMongos = true;
@@ -1002,7 +570,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
const AggregationRequest& request,
const LiteParsedPipeline& litePipe,
const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
- DispatchShardPipelineResults&& shardDispatchResults,
+ MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults,
BSONObjBuilder* result) {
// We should never be in a situation where we call this function on a non-merge pipeline.
invariant(shardDispatchResults.splitPipeline);
@@ -1089,7 +657,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const AggregationRequest& request,
BSONObjBuilder* result) {
uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec());
- auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
+ auto executionNsRoutingInfoStatus =
+ MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
boost::optional<CachedCollectionRoutingInfo> routingInfo;
LiteParsedPipeline litePipe(request);
@@ -1109,18 +678,38 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// Determine whether this aggregation must be dispatched to all shards in the cluster.
- const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, litePipe);
+ const bool mustRunOnAll =
+ MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe);
// If we don't have a routing table, then this is a $changeStream which must run on all shards.
invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream()));
- // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does
- // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(),
- // then go ahead and pass it through to the owning shard unmodified. Note that we first call
- // resolveInvolvedNamespaces to validate that none of the namespaces are sharded.
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ bool involvesShardedCollections = false;
+ for (auto&& nss : litePipe.getInvolvedNamespaces()) {
+ const auto resolvedNsRoutingInfo =
+ uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss));
+
+ uassert(28769,
+ str::stream() << nss.ns() << " cannot be sharded",
+ !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss));
+
+ resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
+ if (resolvedNsRoutingInfo.cm()) {
+ involvesShardedCollections = true;
+ }
+ }
+
+ // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
+ // met:
+ //
+ // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
+ // 2. Is allowed to be forwarded to shards.
+ // 3. Does not need to run on all shards.
+ // 4. Doesn't need transformation via DocumentSource::serialize().
if (routingInfo && !routingInfo->cm() && !mustRunOnAll &&
- litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) {
- resolveInvolvedNamespaces(opCtx, litePipe);
+ litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos() &&
+ !involvesShardedCollections) {
const auto primaryShardId = routingInfo->db().primary()->getId();
return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result);
}
@@ -1133,7 +722,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
// resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the
// pipeline's stages.
- auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid);
+ auto expCtx = makeExpressionContext(
+ opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces));
// Parse and optimize the full pipeline.
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
@@ -1154,7 +744,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = dispatchShardPipeline(
+ auto shardDispatchResults = MongoSInterface::dispatchShardPipeline(
expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
@@ -1237,7 +827,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj()));
+ MongoSInterface::createPassthroughCommandForShard(
+ opCtx, aggRequest, shardId, nullptr, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,