summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml4
-rw-r--r--jstests/aggregation/mongos_merge.js39
-rw-r--r--jstests/aggregation/sources/facet/use_cases.js23
-rw-r--r--jstests/aggregation/sources/graphLookup/sharded.js54
-rw-r--r--jstests/aggregation/sources/lookup/collation_lookup.js368
-rw-r--r--jstests/aggregation/sources/lookup/lookup.js1125
-rw-r--r--jstests/aggregation/sources/lookup/lookup_subpipeline.js604
-rw-r--r--jstests/sharding/collation_lookup.js454
-rw-r--r--jstests/sharding/lookup.js609
-rw-r--r--jstests/sharding/lookup_mongod_unaware.js168
-rw-r--r--jstests/sharding/lookup_stale_mongos.js130
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp37
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h20
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp8
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h10
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp15
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp512
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp526
-rw-r--r--src/mongo/s/commands/pipeline_s.h73
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h14
-rw-r--r--src/mongo/s/query/router_stage_internal_cursor.h56
27 files changed, 2772 insertions, 2114 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
index 8bd6bcff821..0466e72036a 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml
@@ -17,8 +17,7 @@ selector:
- jstests/aggregation/sources/addFields/weather.js
- jstests/aggregation/sources/collStats/shard_host_info.js
- jstests/aggregation/sources/facet/use_cases.js
- - jstests/aggregation/sources/graphLookup/sharded.js
- - jstests/aggregation/sources/lookup/lookup.js
+ - jstests/aggregation/sources/lookup/lookup_subpipeline.js
- jstests/aggregation/sources/replaceRoot/address.js
- jstests/aggregation/testshard1.js
# The following tests start their own ReplSetTest.
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 8fd0c4098ad..53fdb5f148b 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -55,13 +55,17 @@ selector:
- jstests/sharding/change_streams_shards_start_in_sync.js
- jstests/sharding/change_streams_unsharded_becomes_sharded.js
- jstests/sharding/change_streams_primary_shard_unaware.js
+ - jstests/sharding/collation_lookup.js
- jstests/sharding/enable_sharding_basic.js
- jstests/sharding/key_rotation.js
- jstests/sharding/kill_sessions.js
- jstests/sharding/logical_time_api.js
+ - jstests/sharding/lookup.js
- jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js
- jstests/sharding/lookup_change_stream_post_image_id_shard_key.js
+ - jstests/sharding/lookup_mongod_unaware.js
+ - jstests/sharding/lookup_stale_mongos.js
- jstests/sharding/mongos_does_not_gossip_logical_time_without_keys.js
- jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js
- jstests/sharding/move_chunk_insert_with_write_retryability.js
diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js
index 97108f59b2f..76108fdf602 100644
--- a/jstests/aggregation/mongos_merge.js
+++ b/jstests/aggregation/mongos_merge.js
@@ -281,6 +281,45 @@
allowDiskUse: allowDiskUse,
expectedCount: 400
});
+
+ // Test that $lookup is merged on the primary shard when the foreign collection is
+ // unsharded.
+ assertMergeOnMongoD({
+ testName: "agg_mongos_merge_lookup_unsharded_disk_use_" + allowDiskUse,
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $lookup: {
+ from: unshardedColl.getName(),
+ localField: "_id",
+ foreignField: "_id",
+ as: "lookupField"
+ }
+ }
+ ],
+ mergeType: "primaryShard",
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
+
+ // Test that $lookup is merged on mongoS when the foreign collection is sharded.
+ assertMergeOnMongoS({
+ testName: "agg_mongos_merge_lookup_sharded_disk_use_" + allowDiskUse,
+ pipeline: [
+ {$match: {_id: {$gte: -200, $lte: 200}}},
+ {
+ $lookup: {
+ from: mongosColl.getName(),
+ localField: "_id",
+ foreignField: "_id",
+ as: "lookupField"
+ }
+ }
+ ],
+ mergeType: "mongos",
+ allowDiskUse: allowDiskUse,
+ expectedCount: 400
+ });
}
/**
diff --git a/jstests/aggregation/sources/facet/use_cases.js b/jstests/aggregation/sources/facet/use_cases.js
index 45791cab5d3..fed4c0b0c5c 100644
--- a/jstests/aggregation/sources/facet/use_cases.js
+++ b/jstests/aggregation/sources/facet/use_cases.js
@@ -114,8 +114,7 @@
populateData(st.s0, nDocs);
doExecutionTest(st.s0);
- // Test that $facet stage propagates information about involved collections, preventing users
- // from doing things like $lookup from a sharded collection.
+ // Test that $facet stage propagates information about involved collections.
const shardedDBName = "sharded";
const shardedCollName = "collection";
const shardedColl = st.getDB(shardedDBName).getCollection(shardedCollName);
@@ -125,21 +124,8 @@
assert.commandWorked(
st.admin.runCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}}));
- // Test that trying to perform a $lookup on a sharded collection returns an error.
- let res = assert.commandFailed(unshardedColl.runCommand({
- aggregate: unshardedColl.getName(),
- pipeline: [{
- $lookup:
- {from: shardedCollName, localField: "_id", foreignField: "_id", as: "results"}
- }],
- cursor: {}
- }));
- assert.eq(
- 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection");
-
- // Test that trying to perform a $lookup on a sharded collection inside a $facet stage still
- // returns an error.
- res = assert.commandFailed(unshardedColl.runCommand({
+ // Test $lookup inside a $facet stage on a sharded collection.
+ assert.commandWorked(unshardedColl.runCommand({
aggregate: unshardedColl.getName(),
pipeline: [{
$facet: {
@@ -155,9 +141,6 @@
}],
cursor: {}
}));
- assert.eq(
- 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection");
-
// Then run the assertions against a sharded collection.
assert.commandWorked(st.admin.runCommand({enableSharding: dbName}));
assert.commandWorked(st.admin.runCommand({shardCollection: testNs, key: {_id: 1}}));
diff --git a/jstests/aggregation/sources/graphLookup/sharded.js b/jstests/aggregation/sources/graphLookup/sharded.js
deleted file mode 100644
index d54e2be01c8..00000000000
--- a/jstests/aggregation/sources/graphLookup/sharded.js
+++ /dev/null
@@ -1,54 +0,0 @@
-// In SERVER-23725, $graphLookup was introduced. In this file, we test that the expression behaves
-// correctly on a sharded collection.
-load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
-
-(function() {
- "use strict";
-
- var st = new ShardingTest({name: "aggregation_graph_lookup", shards: 2, mongos: 1});
-
- st.adminCommand({enableSharding: "graphLookup"});
- st.ensurePrimaryShard("graphLookup", "shard0001");
- st.adminCommand({shardCollection: "graphLookup.local", key: {_id: 1}});
-
- var foreign = st.getDB("graphLookup").foreign;
- var local = st.getDB("graphLookup").local;
-
- var bulk = foreign.initializeUnorderedBulkOp();
-
- for (var i = 0; i < 100; i++) {
- bulk.insert({_id: i, next: i + 1});
- }
- assert.writeOK(bulk.execute());
-
- assert.writeOK(local.insert({}));
-
- var res = st.s.getDB("graphLookup")
- .local
- .aggregate({
- $graphLookup: {
- from: "foreign",
- startWith: {$literal: 0},
- connectToField: "_id",
- connectFromField: "next",
- as: "number_line"
- }
- })
- .toArray();
-
- assert.eq(res.length, 1);
- assert.eq(res[0].number_line.length, 100);
-
- // Cannot perform a $graphLookup where the "from" collection is sharded.
- var pipeline = {
- $graphLookup: {
- from: "local",
- startWith: {$literal: 0},
- connectToField: "_id",
- connectFromField: "_id",
- as: "out"
- }
- };
-
- assertErrorCode(foreign, pipeline, 28769);
-}());
diff --git a/jstests/aggregation/sources/lookup/collation_lookup.js b/jstests/aggregation/sources/lookup/collation_lookup.js
deleted file mode 100644
index 80d173138a6..00000000000
--- a/jstests/aggregation/sources/lookup/collation_lookup.js
+++ /dev/null
@@ -1,368 +0,0 @@
-// Cannot implicitly shard accessed collections because of collection existing when none expected.
-// @tags: [assumes_no_implicit_collection_creation_after_drop]
-
-/**
- * Tests that the $lookup stage respects the collation.
- *
- * The comparison of string values between the 'localField' and 'foreignField' should use the
- * collation either explicitly set on the aggregation operation, or the collation inherited from the
- * collection the "aggregate" command was performed on.
- */
-(function() {
- "use strict";
-
- load("jstests/aggregation/extras/utils.js"); // for arrayEq
-
- const caseInsensitive = {collation: {locale: "en_US", strength: 2}};
-
- const withDefaultCollationColl = db.collation_lookup_with_default;
- const withoutDefaultCollationColl = db.collation_lookup_without_default;
-
- withDefaultCollationColl.drop();
- withoutDefaultCollationColl.drop();
-
- assert.commandWorked(db.createCollection(withDefaultCollationColl.getName(), caseInsensitive));
- assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
-
- assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
- assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"}));
- assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"}));
-
- // Test that the $lookup stage respects the inherited collation.
- let res = withDefaultCollationColl
- .aggregate([{
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- }])
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
- assert(
- arrayEq(expected, res[0].matched),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering");
-
- res = withDefaultCollationColl
- .aggregate([{
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- }
- ],
- as: "matched1",
- },
- }])
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- },
- {
- "_id": "uppercase",
- "str": "ABC",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- }
- ];
- assert(arrayEq(expected, res[0].matched1),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
- " up to ordering. " + tojson(res));
-
- // Test that the $lookup stage respects the inherited collation when it optimizes with an
- // $unwind stage.
- res = withDefaultCollationColl
- .aggregate([
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- {$unwind: "$matched"},
- ])
- .toArray();
- assert.eq(2, res.length, tojson(res));
-
- expected = [
- {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
- {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- res = withDefaultCollationColl
- .aggregate([
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- },
- {$unwind: "$matched2"},
- ],
- as: "matched1",
- },
- },
- {$unwind: "$matched1"},
- ])
- .toArray();
- assert.eq(4, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}}
- }
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- // Test that the $lookup stage respects an explicit collation on the aggregation operation.
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- ],
- caseInsensitive)
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
- assert(
- arrayEq(expected, res[0].matched),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering");
-
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- }
- ],
- as: "matched1",
- },
- }
- ],
- caseInsensitive)
- .toArray();
- assert.eq(1, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- },
- {
- "_id": "uppercase",
- "str": "ABC",
- "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
- }
- ];
- assert(arrayEq(expected, res[0].matched1),
- "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
- " up to ordering");
-
- // Test that the $lookup stage respects an explicit collation on the aggregation operation when
- // it optimizes with an $unwind stage.
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- {$unwind: "$matched"},
- ],
- caseInsensitive)
- .toArray();
- assert.eq(2, res.length, tojson(res));
-
- expected = [
- {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
- {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- res = withoutDefaultCollationColl
- .aggregate(
- [
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- },
- {$unwind: "$matched2"},
- ],
- as: "matched1",
- },
- },
- {$unwind: "$matched1"},
- ],
- caseInsensitive)
- .toArray();
- assert.eq(4, res.length, tojson(res));
-
- expected = [
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}}
- },
- {
- "_id": "lowercase",
- "str": "abc",
- "matched1":
- {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}}
- }
- ];
- assert(arrayEq(expected, res),
- "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
-
- // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the
- // collection or the aggregation operation.
- res = withoutDefaultCollationColl
- .aggregate([
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withDefaultCollationColl.getName(),
- localField: "str",
- foreignField: "str",
- as: "matched",
- },
- },
- ])
- .toArray();
- assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res);
-
- res = withoutDefaultCollationColl
- .aggregate([
- {$match: {_id: "lowercase"}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str1: "$str"},
- pipeline: [
- {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
- {
- $lookup: {
- from: withoutDefaultCollationColl.getName(),
- let : {str2: "$str"},
- pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
- as: "matched2"
- }
- },
- {$unwind: "$matched2"},
- ],
- as: "matched1",
- },
- },
- ])
- .toArray();
- assert.eq([{
- "_id": "lowercase",
- "str": "abc",
- "matched1": [{
- "_id": "lowercase",
- "str": "abc",
- "matched2": {"_id": "lowercase", "str": "abc"}
- }]
- }],
- res);
-})();
diff --git a/jstests/aggregation/sources/lookup/lookup.js b/jstests/aggregation/sources/lookup/lookup.js
deleted file mode 100644
index 90b4669d886..00000000000
--- a/jstests/aggregation/sources/lookup/lookup.js
+++ /dev/null
@@ -1,1125 +0,0 @@
-// Basic $lookup regression tests.
-
-load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
-
-(function() {
- "use strict";
-
- // Used by testPipeline to sort result documents. All _ids must be primitives.
- function compareId(a, b) {
- if (a._id < b._id) {
- return -1;
- }
- if (a._id > b._id) {
- return 1;
- }
- return 0;
- }
-
- function generateNestedPipeline(foreignCollName, numLevels) {
- let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}];
-
- for (let level = 1; level < numLevels; level++) {
- pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}];
- }
-
- return pipeline;
- }
-
- // Helper for testing that pipeline returns correct set of results.
- function testPipeline(pipeline, expectedResult, collection) {
- assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
- expectedResult.sort(compareId));
- }
-
- function runTest(coll, from, thirdColl, fourthColl) {
- var db = null; // Using the db variable is banned in this function.
-
- assert.writeOK(coll.insert({_id: 0, a: 1}));
- assert.writeOK(coll.insert({_id: 1, a: null}));
- assert.writeOK(coll.insert({_id: 2}));
-
- assert.writeOK(from.insert({_id: 0, b: 1}));
- assert.writeOK(from.insert({_id: 1, b: null}));
- assert.writeOK(from.insert({_id: 2}));
-
- //
- // Basic functionality.
- //
-
- // "from" document added to "as" field if a == b, where nonexistent fields are treated as
- // null.
- var expectedResults = [
- {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}],
- expectedResults,
- coll);
-
- // If localField is nonexistent, it is treated as if it is null.
- expectedResults = [
- {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}],
- expectedResults,
- coll);
-
- // If foreignField is nonexistent, it is treated as if it is null.
- expectedResults = [
- {_id: 0, a: 1, "same": []},
- {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}],
- expectedResults,
- coll);
-
- // If there are no matches or the from coll doesn't exist, the result is an empty array.
- expectedResults =
- [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}];
- testPipeline(
- [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}],
- expectedResults,
- coll);
- testPipeline(
- [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}],
- expectedResults,
- coll);
-
- // If field name specified by "as" already exists, it is overwritten.
- expectedResults = [
- {_id: 0, "a": [{_id: 0, b: 1}]},
- {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}],
- expectedResults,
- coll);
-
- // Running multiple $lookups in the same pipeline is allowed.
- expectedResults = [
- {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]},
- {
- _id: 1,
- a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]
- },
- {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}},
- {$project: {"a": 1, "c": 1}},
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}}
- ],
- expectedResults,
- coll);
-
- //
- // Coalescing with $unwind.
- //
-
- // A normal $unwind with on the "as" field.
- expectedResults = [
- {_id: 0, a: 1, same: {_id: 0, b: 1}},
- {_id: 1, a: null, same: {_id: 1, b: null}},
- {_id: 1, a: null, same: {_id: 2}},
- {_id: 2, same: {_id: 1, b: null}},
- {_id: 2, same: {_id: 2}}
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
- {$unwind: {path: "$same"}}
- ],
- expectedResults,
- coll);
-
- // An $unwind on the "as" field, with includeArrayIndex.
- expectedResults = [
- {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)},
- {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)},
- {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)},
- {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)},
- {_id: 2, same: {_id: 2}, index: NumberLong(1)},
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
- {$unwind: {path: "$same", includeArrayIndex: "index"}}
- ],
- expectedResults,
- coll);
-
- // Normal $unwind with no matching documents.
- expectedResults = [];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
- {$unwind: {path: "$same"}}
- ],
- expectedResults,
- coll);
-
- // $unwind with preserveNullAndEmptyArray with no matching documents.
- expectedResults = [
- {_id: 0, a: 1},
- {_id: 1, a: null},
- {_id: 2},
- ];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
- {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
- ],
- expectedResults,
- coll);
-
- // $unwind with preserveNullAndEmptyArray, some with matching documents, some without.
- expectedResults = [
- {_id: 0, a: 1},
- {_id: 1, a: null, same: {_id: 0, b: 1}},
- {_id: 2},
- ];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
- {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
- ],
- expectedResults,
- coll);
-
- // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching
- // documents, some without.
- expectedResults = [
- {_id: 0, a: 1, index: null},
- {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)},
- {_id: 2, index: null},
- ];
- testPipeline(
- [
- {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
- {
- $unwind:
- {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"}
- }
- ],
- expectedResults,
- coll);
-
- //
- // Dependencies.
- //
-
- // If $lookup didn't add "localField" to its dependencies, this test would fail as the
- // value of the "a" field would be lost and treated as null.
- expectedResults = [
- {_id: 0, "same": [{_id: 0, b: 1}]},
- {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
- ];
- testPipeline(
- [
- {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
- {$project: {"same": 1}}
- ],
- expectedResults,
- coll);
-
- // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test
- // would fail as the value of the "a" field would be lost and treated as null.
- expectedResults = [
- {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]},
- {
- "_id": 1,
- "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}]
- },
- {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]}
- ];
- testPipeline(
- [
- {
- $lookup: {
- let : {var1: "$a"},
- pipeline: [{$project: {x: "$$var1"}}],
- from: "from",
- as: "same"
- }
- },
- {$project: {"same": 1}}
- ],
- expectedResults,
- coll);
-
- //
- // Dotted field paths.
- //
-
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: 1}));
- assert.writeOK(coll.insert({_id: 1, a: null}));
- assert.writeOK(coll.insert({_id: 2}));
- assert.writeOK(coll.insert({_id: 3, a: {c: 1}}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, b: 1}));
- assert.writeOK(from.insert({_id: 1, b: null}));
- assert.writeOK(from.insert({_id: 2}));
- assert.writeOK(from.insert({_id: 3, b: {c: 1}}));
- assert.writeOK(from.insert({_id: 4, b: {c: 2}}));
-
- // Once without a dotted field.
- var pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}];
- expectedResults = [
- {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
- {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]},
- {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Look up a dotted field.
- pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}];
- // All but the last document in 'coll' have a nullish value for 'a.c'.
- expectedResults = [
- {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
- {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // With an $unwind stage.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: {b: 1}}));
- assert.writeOK(coll.insert({_id: 1}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, target: 1}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a.b",
- foreignField: "target",
- from: "from",
- as: "same.documents",
- }
- },
- {
- // Expected input to $unwind:
- // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}}
- // {_id: 1, same: {documents: []}}
- $unwind: {
- path: "$same.documents",
- preserveNullAndEmptyArrays: true,
- includeArrayIndex: "c.d.e",
- }
- }
- ];
- expectedResults = [
- {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}},
- {_id: 1, same: {}, c: {d: {e: null}}},
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Query-like local fields (SERVER-21287)
- //
-
- // This must only do an equality match rather than treating the value as a regex.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: /a regex/}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, b: /a regex/}));
- assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "b",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // A local value of an array.
- //
-
- // Basic array corresponding to multiple documents.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0}));
- assert.writeOK(from.insert({_id: 1}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "_id",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- // Basic array corresponding to a single document.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [1]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0}));
- assert.writeOK(from.insert({_id: 1}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "_id",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- // Array containing regular expressions.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]}));
- assert.writeOK(coll.insert({_id: 1, a: [/^x/]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0, b: "should not match a regex"}));
- assert.writeOK(from.insert({_id: 1, b: "xxxx"}));
- assert.writeOK(from.insert({_id: 2, b: /a regex/}));
- assert.writeOK(from.insert({_id: 3, b: /^x/}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a",
- foreignField: "b",
- from: "from",
- as: "b",
- }
- },
- ];
- expectedResults = [
- {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]},
- {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'localField' references a field within an array of sub-objects.
- coll.drop();
- assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 0}));
- assert.writeOK(from.insert({_id: 1}));
- assert.writeOK(from.insert({_id: 2}));
- assert.writeOK(from.insert({_id: 3}));
-
- pipeline = [
- {
- $lookup: {
- localField: "a.b",
- foreignField: "_id",
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Pipeline syntax using 'let' variables.
- //
- coll.drop();
- assert.writeOK(coll.insert({_id: 1, x: 1}));
- assert.writeOK(coll.insert({_id: 2, x: 2}));
- assert.writeOK(coll.insert({_id: 3, x: 3}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 1}));
- assert.writeOK(from.insert({_id: 2}));
- assert.writeOK(from.insert({_id: 3}));
-
- // Basic non-equi theta join via $project.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}},
- {$match: {isMatch: true}},
- {$project: {isMatch: 0}}
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, x: 1, "c": []},
- {"_id": 2, x: 2, "c": [{"_id": 1}]},
- {
- "_id": 3,
- x: 3,
- "c": [
- {"_id": 1},
- {
- "_id": 2,
- }
- ]
- }
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Basic non-equi theta join via $match.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {$match: {$expr: {$lt: ["$_id", "$$var1"]}}},
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, x: 1, "c": []},
- {"_id": 2, x: 2, "c": [{"_id": 1}]},
- {
- "_id": 3,
- x: 3,
- "c": [
- {"_id": 1},
- {
- "_id": 2,
- }
- ]
- }
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Multi-level join using $match.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {$match: {$expr: {$eq: ["$_id", "$$var1"]}}},
- {
- $lookup: {
- let : {var2: "$_id"},
- pipeline: [
- {$match: {$expr: {$gt: ["$_id", "$$var2"]}}},
- ],
- from: "from",
- as: "d"
- }
- },
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]},
- {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]},
- {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Equijoin with $match that can't be delegated to the query subsystem.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {$addFields: {newField: 2}},
- {$match: {$expr: {$eq: ["$newField", "$$var1"]}}},
- {$project: {newField: 0}}
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {"_id": 1, "x": 1, "c": []},
- {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
- {"_id": 3, "x": 3, "c": []}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Multiple variables.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$_id", var2: "$x"},
- pipeline: [
- {
- $project: {
- isMatch: {$gt: ["$$var1", "$_id"]},
- var2Times2: {$multiply: [2, "$$var2"]}
- }
- },
- {$match: {isMatch: true}},
- {$project: {isMatch: 0}}
- ],
- from: "from",
- as: "c",
- },
- },
- {$project: {x: 1, c: 1}}
- ];
-
- expectedResults = [
- {"_id": 1, x: 1, "c": []},
- {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]},
- {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Let var as complex expression object.
- pipeline = [
- {
- $lookup: {
- let : {var1: {$mod: ["$x", 3]}},
- pipeline: [
- {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}},
- ],
- from: "from",
- as: "c",
- }
- },
- ];
-
- expectedResults = [
- {
- "_id": 1,
- x: 1,
- "c": [
- {_id: 1, var1Mod3TimesForeignId: 1},
- {_id: 2, var1Mod3TimesForeignId: 2},
- {_id: 3, var1Mod3TimesForeignId: 3}
- ]
- },
- {
- "_id": 2,
- x: 2,
- "c": [
- {_id: 1, var1Mod3TimesForeignId: 2},
- {_id: 2, var1Mod3TimesForeignId: 4},
- {_id: 3, var1Mod3TimesForeignId: 6}
- ]
- },
- {
- "_id": 3,
- x: 3,
- "c": [
- {_id: 1, var1Mod3TimesForeignId: 0},
- {_id: 2, var1Mod3TimesForeignId: 0},
- {_id: 3, var1Mod3TimesForeignId: 0}
- ]
- }
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' defined variables are available to all nested sub-pipelines.
- pipeline = [
- {$match: {_id: 1}},
- {
- $lookup: {
- let : {var1: "ABC", var2: "123"},
- pipeline: [
- {$match: {_id: 1}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 2}},
- {$addFields: {letVar1: "$$var1"}},
- {
- $lookup: {
- let : {var3: "XYZ"},
- pipeline: [{
- $addFields: {
- mergedLetVars:
- {$concat: ["$$var1", "$$var2", "$$var3"]}
- }
- }],
- from: "from",
- as: "join3"
- }
- },
- ],
- from: "from",
- as: "join2"
- }
- },
- ],
- from: "from",
- as: "join1",
- }
- }
- ];
-
- expectedResults = [{
- "_id": 1,
- "x": 1,
- "join1": [{
- "_id": 1,
- "join2": [{
- "_id": 2,
- "letVar1": "ABC",
- "join3": [
- {"_id": 1, "mergedLetVars": "ABC123XYZ"},
- {"_id": 2, "mergedLetVars": "ABC123XYZ"},
- {"_id": 3, "mergedLetVars": "ABC123XYZ"}
- ]
- }]
- }]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' variable shadowed by foreign pipeline variable.
- pipeline = [
- {$match: {_id: 2}},
- {
- $lookup: {
- let : {var1: "$_id"},
- pipeline: [
- {
- $project: {
- shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}},
- originalVar: "$$var1"
- }
- },
- {
- $lookup: {
- pipeline: [{
- $project: {
- shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}},
- originalVar: "$$var1"
- }
- }],
- from: "from",
- as: "d"
- }
- }
- ],
- from: "from",
- as: "c",
- }
- }
- ];
-
- expectedResults = [{
- "_id": 2,
- "x": 2,
- "c": [
- {
- "_id": 1,
- "shadowedVar": "abc",
- "originalVar": 2,
- "d": [
- {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
- ]
- },
- {
- "_id": 2,
- "shadowedVar": "abc",
- "originalVar": 2,
- "d": [
- {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
- ]
- },
- {
- "_id": 3,
- "shadowedVar": "abc",
- "originalVar": 2,
- "d": [
- {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
- {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
- ]
- }
- ]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // Use of undefined variable fails.
- assertErrorCode(coll,
- [{
- $lookup: {
- from: "from",
- as: "as",
- let : {var1: "$x"},
- pipeline: [{$project: {myVar: "$$nonExistent"}}]
- }
- }],
- 17276);
-
- // The dotted path offset of a non-object variable is equivalent referencing an undefined
- // field.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {
- $match: {
- $expr: {
- $eq: [
- "FIELD-IS-NULL",
- {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]}
- ]
- }
- }
- },
- ],
- from: "from",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [
- {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
- {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
- {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}
- ];
- testPipeline(pipeline, expectedResults, coll);
-
- // Comparison where a 'let' variable references an array.
- coll.drop();
- assert.writeOK(coll.insert({x: [1, 2, 3]}));
-
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}},
- ],
- from: "from",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Pipeline syntax with nested object.
- //
- coll.drop();
- assert.writeOK(coll.insert({x: {y: {z: 10}}}));
-
- // Subfields of 'let' variables can be referenced via dotted path.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$x"},
- pipeline: [
- {$project: {z: "$$var1.y.z"}},
- ],
- from: "from",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{
- "x": {"y": {"z": 10}},
- "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' variable with dotted field path off of $$ROOT.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$$ROOT.x.y.z"},
- pipeline:
- [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}],
- from: "lookUp",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- // 'let' variable with dotted field path off of $$CURRENT.
- pipeline = [
- {
- $lookup: {
- let : {var1: "$$CURRENT.x.y.z"},
- pipeline: [
- {$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}},
- {$project: {_id: 0}}
- ],
- from: "lookUp",
- as: "as",
- }
- },
- {$project: {_id: 0}}
- ];
-
- expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
- testPipeline(pipeline, expectedResults, coll);
-
- //
- // Pipeline syntax with nested $lookup.
- //
- coll.drop();
- assert.writeOK(coll.insert({_id: 1, w: 1}));
- assert.writeOK(coll.insert({_id: 2, w: 2}));
- assert.writeOK(coll.insert({_id: 3, w: 3}));
-
- from.drop();
- assert.writeOK(from.insert({_id: 1, x: 1}));
- assert.writeOK(from.insert({_id: 2, x: 2}));
- assert.writeOK(from.insert({_id: 3, x: 3}));
-
- thirdColl.drop();
- assert.writeOK(thirdColl.insert({_id: 1, y: 1}));
- assert.writeOK(thirdColl.insert({_id: 2, y: 2}));
- assert.writeOK(thirdColl.insert({_id: 3, y: 3}));
-
- fourthColl.drop();
- assert.writeOK(fourthColl.insert({_id: 1, z: 1}));
- assert.writeOK(fourthColl.insert({_id: 2, z: 2}));
- assert.writeOK(fourthColl.insert({_id: 3, z: 3}));
-
- // Nested $lookup pipeline.
- pipeline = [
- {$match: {_id: 1}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 2}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 3}},
- {
- $lookup: {
- pipeline: [
- {$match: {_id: 1}},
- ],
- from: "fourthColl",
- as: "thirdLookup"
- }
- },
- ],
- from: "thirdColl",
- as: "secondLookup"
- }
- },
- ],
- from: "from",
- as: "firstLookup",
- }
- }
- ];
-
- expectedResults = [{
- "_id": 1,
- "w": 1,
- "firstLookup": [{
- "_id": 2,
- x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}]
- }]
- }];
- testPipeline(pipeline, expectedResults, coll);
-
- // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested
- // $lookup sub-pipelines up to the maximum depth, but not beyond.
- let nestedPipeline = generateNestedPipeline("lookup", 20);
- assert.commandWorked(coll.getDB().runCommand(
- {aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}}));
-
- nestedPipeline = generateNestedPipeline("lookup", 21);
- assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded);
-
- // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose
- // combined nesting depth exceeds the limit.
- nestedPipeline = generateNestedPipeline("lookup", 10);
- coll.getDB().view1.drop();
- assert.commandWorked(
- coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline}));
-
- nestedPipeline = generateNestedPipeline("view1", 10);
- coll.getDB().view2.drop();
- assert.commandWorked(
- coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline}));
-
- // Confirm that a composite sub-pipeline depth of 20 is allowed.
- assert.commandWorked(
- coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}}));
-
- const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1);
- coll.getDB().view3.drop();
- assert.commandWorked(coll.getDB().runCommand(
- {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit}));
-
- // Confirm that a composite sub-pipeline depth greater than 20 fails.
- assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded);
-
- //
- // Error cases.
- //
-
- // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with
- // localField/foreignField syntax.
- assertErrorCode(coll,
- [{$lookup: {foreignField: "b", from: "from", as: "same"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", from: "from", as: "same"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", as: "same"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: "from"}}],
- ErrorCodes.FailedToParse);
-
- // localField/foreignField and pipeline/let syntax must not be mixed.
- assertErrorCode(coll,
- [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(
- coll,
- [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(
- coll,
- [{
- $lookup:
- {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"}
- }],
- ErrorCodes.FailedToParse);
-
- // 'from', 'as', 'localField' and 'foreignField' must all be of type string.
- assertErrorCode(coll,
- [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}],
- ErrorCodes.FailedToParse);
-
- // 'pipeline' and 'let' must be of expected type.
- assertErrorCode(
- coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
- assertErrorCode(
- coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
- assertErrorCode(coll,
- [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
- assertErrorCode(coll,
- [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}],
- ErrorCodes.FailedToParse);
-
- // The foreign collection must be a valid namespace.
- assertErrorCode(coll,
- [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}],
- ErrorCodes.InvalidNamespace);
- // $lookup's field must be an object.
- assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse);
- }
-
- // Run tests on single node.
- db.lookUp.drop();
- db.from.drop();
- db.thirdColl.drop();
- db.fourthColl.drop();
- runTest(db.lookUp, db.from, db.thirdColl, db.fourthColl);
-
- // Run tests in a sharded environment.
- var sharded = new ShardingTest({shards: 2, mongos: 1});
- assert(sharded.adminCommand({enableSharding: "test"}));
- sharded.getDB('test').lookUp.drop();
- sharded.getDB('test').from.drop();
- sharded.getDB('test').thirdColl.drop();
- sharded.getDB('test').fourthColl.drop();
- assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}}));
- runTest(sharded.getDB('test').lookUp,
- sharded.getDB('test').from,
- sharded.getDB('test').thirdColl,
- sharded.getDB('test').fourthColl);
-
- // An error is thrown if the from collection is sharded.
- assert(sharded.adminCommand({shardCollection: "test.from", key: {_id: 1}}));
- assertErrorCode(sharded.getDB('test').lookUp,
- [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}],
- 28769);
-
- // An error is thrown if nested $lookup from collection is sharded.
- assert(sharded.adminCommand({shardCollection: "test.fourthColl", key: {_id: 1}}));
- assertErrorCode(sharded.getDB('test').lookUp,
- [{
- $lookup: {
- pipeline: [{$lookup: {pipeline: [], from: "fourthColl", as: "same"}}],
- from: "thirdColl",
- as: "same"
- }
- }],
- 28769);
-
- sharded.stop();
-}());
diff --git a/jstests/aggregation/sources/lookup/lookup_subpipeline.js b/jstests/aggregation/sources/lookup/lookup_subpipeline.js
new file mode 100644
index 00000000000..abffadf4c0b
--- /dev/null
+++ b/jstests/aggregation/sources/lookup/lookup_subpipeline.js
@@ -0,0 +1,604 @@
+// Tests for the $lookup stage with a sub-pipeline.
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const testName = "lookup_subpipeline";
+
+ const coll = db.lookUp;
+ const from = db.from;
+ const thirdColl = db.thirdColl;
+ const fourthColl = db.fourthColl;
+
+ // Used by testPipeline to sort result documents. All _ids must be primitives.
+ function compareId(a, b) {
+ if (a._id < b._id) {
+ return -1;
+ }
+ if (a._id > b._id) {
+ return 1;
+ }
+ return 0;
+ }
+
+ function generateNestedPipeline(foreignCollName, numLevels) {
+ let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}];
+
+ for (let level = 1; level < numLevels; level++) {
+ pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}];
+ }
+
+ return pipeline;
+ }
+
+ // Helper for testing that pipeline returns correct set of results.
+ function testPipeline(pipeline, expectedResult, collection) {
+ assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
+ expectedResult.sort(compareId));
+ }
+
+ //
+ // Pipeline syntax using 'let' variables.
+ //
+ coll.drop();
+ assert.writeOK(coll.insert({_id: 1, x: 1}));
+ assert.writeOK(coll.insert({_id: 2, x: 2}));
+ assert.writeOK(coll.insert({_id: 3, x: 3}));
+
+ from.drop();
+ assert.writeOK(from.insert({_id: 1}));
+ assert.writeOK(from.insert({_id: 2}));
+ assert.writeOK(from.insert({_id: 3}));
+
+ // Basic non-equi theta join via $project.
+ let pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}},
+ {$match: {isMatch: true}},
+ {$project: {isMatch: 0}}
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ let expectedResults = [
+ {"_id": 1, x: 1, "c": []},
+ {"_id": 2, x: 2, "c": [{"_id": 1}]},
+ {
+ "_id": 3,
+ x: 3,
+ "c": [
+ {"_id": 1},
+ {
+ "_id": 2,
+ }
+ ]
+ }
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+ // Basic non-equi theta join via $match.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {$match: {$expr: {$lt: ["$_id", "$$var1"]}}},
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {"_id": 1, x: 1, "c": []},
+ {"_id": 2, x: 2, "c": [{"_id": 1}]},
+ {
+ "_id": 3,
+ x: 3,
+ "c": [
+ {"_id": 1},
+ {
+ "_id": 2,
+ }
+ ]
+ }
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Multi-level join using $match.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$_id", "$$var1"]}}},
+ {
+ $lookup: {
+ let : {var2: "$_id"},
+ pipeline: [
+ {$match: {$expr: {$gt: ["$_id", "$$var2"]}}},
+ ],
+ from: "from",
+ as: "d"
+ }
+ },
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]},
+ {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]},
+ {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Equijoin with $match that can't be delegated to the query subsystem.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {$addFields: {newField: 2}},
+ {$match: {$expr: {$eq: ["$newField", "$$var1"]}}},
+ {$project: {newField: 0}}
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {"_id": 1, "x": 1, "c": []},
+ {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
+ {"_id": 3, "x": 3, "c": []}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Multiple variables.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$_id", var2: "$x"},
+ pipeline: [
+ {
+ $project: {
+ isMatch: {$gt: ["$$var1", "$_id"]},
+ var2Times2: {$multiply: [2, "$$var2"]}
+ }
+ },
+ {$match: {isMatch: true}},
+ {$project: {isMatch: 0}}
+ ],
+ from: "from",
+ as: "c",
+ },
+ },
+ {$project: {x: 1, c: 1}}
+ ];
+
+ expectedResults = [
+ {"_id": 1, x: 1, "c": []},
+ {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]},
+ {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Let var as complex expression object.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: {$mod: ["$x", 3]}},
+ pipeline: [
+ {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}},
+ ],
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [
+ {
+ "_id": 1,
+ x: 1,
+ "c": [
+ {_id: 1, var1Mod3TimesForeignId: 1},
+ {_id: 2, var1Mod3TimesForeignId: 2},
+ {_id: 3, var1Mod3TimesForeignId: 3}
+ ]
+ },
+ {
+ "_id": 2,
+ x: 2,
+ "c": [
+ {_id: 1, var1Mod3TimesForeignId: 2},
+ {_id: 2, var1Mod3TimesForeignId: 4},
+ {_id: 3, var1Mod3TimesForeignId: 6}
+ ]
+ },
+ {
+ "_id": 3,
+ x: 3,
+ "c": [
+ {_id: 1, var1Mod3TimesForeignId: 0},
+ {_id: 2, var1Mod3TimesForeignId: 0},
+ {_id: 3, var1Mod3TimesForeignId: 0}
+ ]
+ }
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' defined variables are available to all nested sub-pipelines.
+ pipeline = [
+ {$match: {_id: 1}},
+ {
+ $lookup: {
+ let : {var1: "ABC", var2: "123"},
+ pipeline: [
+ {$match: {_id: 1}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 2}},
+ {$addFields: {letVar1: "$$var1"}},
+ {
+ $lookup: {
+ let : {var3: "XYZ"},
+ pipeline: [{
+ $addFields: {
+ mergedLetVars:
+ {$concat: ["$$var1", "$$var2", "$$var3"]}
+ }
+ }],
+ from: "from",
+ as: "join3"
+ }
+ },
+ ],
+ from: "from",
+ as: "join2"
+ }
+ },
+ ],
+ from: "from",
+ as: "join1",
+ }
+ }
+ ];
+
+ expectedResults = [{
+ "_id": 1,
+ "x": 1,
+ "join1": [{
+ "_id": 1,
+ "join2": [{
+ "_id": 2,
+ "letVar1": "ABC",
+ "join3": [
+ {"_id": 1, "mergedLetVars": "ABC123XYZ"},
+ {"_id": 2, "mergedLetVars": "ABC123XYZ"},
+ {"_id": 3, "mergedLetVars": "ABC123XYZ"}
+ ]
+ }]
+ }]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' variable shadowed by foreign pipeline variable.
+ pipeline = [
+ {$match: {_id: 2}},
+ {
+ $lookup: {
+ let : {var1: "$_id"},
+ pipeline: [
+ {
+ $project: {
+ shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}},
+ originalVar: "$$var1"
+ }
+ },
+ {
+ $lookup: {
+ pipeline: [{
+ $project: {
+ shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}},
+ originalVar: "$$var1"
+ }
+ }],
+ from: "from",
+ as: "d"
+ }
+ }
+ ],
+ from: "from",
+ as: "c",
+ }
+ }
+ ];
+
+ expectedResults = [{
+ "_id": 2,
+ "x": 2,
+ "c": [
+ {
+ "_id": 1,
+ "shadowedVar": "abc",
+ "originalVar": 2,
+ "d": [
+ {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
+ ]
+ },
+ {
+ "_id": 2,
+ "shadowedVar": "abc",
+ "originalVar": 2,
+ "d": [
+ {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
+ ]
+ },
+ {
+ "_id": 3,
+ "shadowedVar": "abc",
+ "originalVar": 2,
+ "d": [
+ {"_id": 1, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 2, "shadowedVar": "xyz", "originalVar": 2},
+ {"_id": 3, "shadowedVar": "xyz", "originalVar": 2}
+ ]
+ }
+ ]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Use of undefined variable fails.
+ assertErrorCode(coll,
+ [{
+ $lookup: {
+ from: "from",
+ as: "as",
+ let : {var1: "$x"},
+ pipeline: [{$project: {myVar: "$$nonExistent"}}]
+ }
+ }],
+ 17276);
+
+ // The dotted path offset of a non-object variable is equivalent referencing an undefined
+ // field.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {
+ $match: {
+ $expr:
+ {$eq: ["FIELD-IS-NULL", {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]}]}
+ }
+ },
+ ],
+ from: "from",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}},
+ {$sort: {x: 1}}
+ ];
+
+ expectedResults = [
+ {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
+ {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]},
+ {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Comparison where a 'let' variable references an array.
+ coll.drop();
+ assert.writeOK(coll.insert({x: [1, 2, 3]}));
+
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}},
+ ],
+ from: "from",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Pipeline syntax with nested object.
+ //
+ coll.drop();
+ assert.writeOK(coll.insert({x: {y: {z: 10}}}));
+
+ // Subfields of 'let' variables can be referenced via dotted path.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$x"},
+ pipeline: [
+ {$project: {z: "$$var1.y.z"}},
+ ],
+ from: "from",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{
+ "x": {"y": {"z": 10}},
+ "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' variable with dotted field path off of $$ROOT.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$$ROOT.x.y.z"},
+ pipeline:
+ [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}],
+ from: "lookUp",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'let' variable with dotted field path off of $$CURRENT.
+ pipeline = [
+ {
+ $lookup: {
+ let : {var1: "$$CURRENT.x.y.z"},
+ pipeline:
+ [{$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}}, {$project: {_id: 0}}],
+ from: "lookUp",
+ as: "as",
+ }
+ },
+ {$project: {_id: 0}}
+ ];
+
+ expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Pipeline syntax with nested $lookup.
+ //
+ coll.drop();
+ assert.writeOK(coll.insert({_id: 1, w: 1}));
+ assert.writeOK(coll.insert({_id: 2, w: 2}));
+ assert.writeOK(coll.insert({_id: 3, w: 3}));
+
+ from.drop();
+ assert.writeOK(from.insert({_id: 1, x: 1}));
+ assert.writeOK(from.insert({_id: 2, x: 2}));
+ assert.writeOK(from.insert({_id: 3, x: 3}));
+
+ thirdColl.drop();
+ assert.writeOK(thirdColl.insert({_id: 1, y: 1}));
+ assert.writeOK(thirdColl.insert({_id: 2, y: 2}));
+ assert.writeOK(thirdColl.insert({_id: 3, y: 3}));
+
+ fourthColl.drop();
+ assert.writeOK(fourthColl.insert({_id: 1, z: 1}));
+ assert.writeOK(fourthColl.insert({_id: 2, z: 2}));
+ assert.writeOK(fourthColl.insert({_id: 3, z: 3}));
+
+ // Nested $lookup pipeline.
+ pipeline = [
+ {$match: {_id: 1}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 2}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 3}},
+ {
+ $lookup: {
+ pipeline: [
+ {$match: {_id: 1}},
+ ],
+ from: "fourthColl",
+ as: "thirdLookup"
+ }
+ },
+ ],
+ from: "thirdColl",
+ as: "secondLookup"
+ }
+ },
+ ],
+ from: "from",
+ as: "firstLookup",
+ }
+ }
+ ];
+
+ expectedResults = [{
+ "_id": 1,
+ "w": 1,
+ "firstLookup": [{
+ "_id": 2,
+ x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}]
+ }]
+ }];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested
+ // $lookup sub-pipelines up to the maximum depth, but not beyond.
+ let nestedPipeline = generateNestedPipeline("lookup", 20);
+ assert.commandWorked(
+ coll.getDB().runCommand({aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}}));
+
+ nestedPipeline = generateNestedPipeline("lookup", 21);
+ assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded);
+
+ // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose
+ // combined nesting depth exceeds the limit.
+ nestedPipeline = generateNestedPipeline("lookup", 10);
+ coll.getDB().view1.drop();
+ assert.commandWorked(
+ coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline}));
+
+ nestedPipeline = generateNestedPipeline("view1", 10);
+ coll.getDB().view2.drop();
+ assert.commandWorked(
+ coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline}));
+
+ // Confirm that a composite sub-pipeline depth of 20 is allowed.
+ assert.commandWorked(coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}}));
+
+ const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1);
+ coll.getDB().view3.drop();
+ assert.commandWorked(coll.getDB().runCommand(
+ {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit}));
+
+ //
+ // Error cases.
+ //
+
+ // Confirm that a composite sub-pipeline depth greater than 20 fails.
+ assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded);
+
+ // 'pipeline' and 'let' must be of expected type.
+ assertErrorCode(
+ coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
+ assertErrorCode(
+ coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch);
+ assertErrorCode(coll,
+ [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+}());
diff --git a/jstests/sharding/collation_lookup.js b/jstests/sharding/collation_lookup.js
new file mode 100644
index 00000000000..f06e92ab3fc
--- /dev/null
+++ b/jstests/sharding/collation_lookup.js
@@ -0,0 +1,454 @@
+/**
+ * Tests that the $lookup stage respects the collation when the local and/or foreign collections
+ * are sharded.
+ *
+ * The comparison of string values between the 'localField' and 'foreignField' should use the
+ * collation either explicitly set on the aggregation operation, or the collation inherited from the
+ * collection the "aggregate" command was performed on.
+ */
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // for arrayEq
+
+ function runTests(withDefaultCollationColl, withoutDefaultCollationColl, collation) {
+ // Test that the $lookup stage respects the inherited collation.
+ let res = withDefaultCollationColl
+ .aggregate([{
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ }])
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
+ assert(arrayEq(expected, res[0].matched),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) +
+ " up to ordering");
+
+ res = withDefaultCollationColl
+ .aggregate([{
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ }
+ ],
+ as: "matched1",
+ },
+ }])
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ },
+ {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2":
+ [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ }
+ ];
+ assert(arrayEq(expected, res[0].matched1),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
+ " up to ordering. " + tojson(res));
+
+ // Test that the $lookup stage respects the inherited collation when it optimizes with an
+ // $unwind stage.
+ res = withDefaultCollationColl
+ .aggregate([
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ {$unwind: "$matched"},
+ ])
+ .toArray();
+ assert.eq(2, res.length, tojson(res));
+
+ expected = [
+ {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
+ {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ res = withDefaultCollationColl
+ .aggregate([
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ },
+ {$unwind: "$matched2"},
+ ],
+ as: "matched1",
+ },
+ },
+ {$unwind: "$matched1"},
+ ])
+ .toArray();
+ assert.eq(4, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ }
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ // Test that the $lookup stage respects an explicit collation on the aggregation operation.
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ ],
+ collation)
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}];
+ assert(arrayEq(expected, res[0].matched),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) +
+ " up to ordering");
+
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ }
+ ],
+ as: "matched1",
+ },
+ }
+ ],
+ collation)
+ .toArray();
+ assert.eq(1, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ },
+ {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2":
+ [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}]
+ }
+ ];
+ assert(arrayEq(expected, res[0].matched1),
+ "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) +
+ " up to ordering");
+
+ // Test that the $lookup stage respects an explicit collation on the aggregation operation
+ // when
+ // it optimizes with an $unwind stage.
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ {$unwind: "$matched"},
+ ],
+ collation)
+ .toArray();
+ assert.eq(2, res.length, tojson(res));
+
+ expected = [
+ {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}},
+ {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}}
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ res = withoutDefaultCollationColl
+ .aggregate(
+ [
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ },
+ {$unwind: "$matched2"},
+ ],
+ as: "matched1",
+ },
+ },
+ {$unwind: "$matched1"},
+ ],
+ collation)
+ .toArray();
+ assert.eq(4, res.length, tojson(res));
+
+ expected = [
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }
+ },
+ {
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": {
+ "_id": "uppercase",
+ "str": "ABC",
+ "matched2": {"_id": "uppercase", "str": "ABC"}
+ }
+ }
+ ];
+ assert(arrayEq(expected, res),
+ "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering");
+
+ // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the
+ // collection or the aggregation operation.
+ res = withoutDefaultCollationColl
+ .aggregate([
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withDefaultCollationColl.getName(),
+ localField: "str",
+ foreignField: "str",
+ as: "matched",
+ },
+ },
+ ])
+ .toArray();
+ assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res);
+
+ res = withoutDefaultCollationColl
+ .aggregate([
+ {$match: {_id: "lowercase"}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str1: "$str"},
+ pipeline: [
+ {$match: {$expr: {$eq: ["$str", "$$str1"]}}},
+ {
+ $lookup: {
+ from: withoutDefaultCollationColl.getName(),
+ let : {str2: "$str"},
+ pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}],
+ as: "matched2"
+ }
+ },
+ {$unwind: "$matched2"},
+ ],
+ as: "matched1",
+ },
+ },
+ ])
+ .toArray();
+ assert.eq([{
+ "_id": "lowercase",
+ "str": "abc",
+ "matched1": [{
+ "_id": "lowercase",
+ "str": "abc",
+ "matched2": {"_id": "lowercase", "str": "abc"}
+ }]
+ }],
+ res);
+ }
+
+ const st = new ShardingTest({shards: 2, config: 1});
+ const testName = "collation_lookup";
+ const caseInsensitive = {collation: {locale: "en_US", strength: 2}};
+
+ const mongosDB = st.s0.getDB(testName);
+ const withDefaultCollationColl = mongosDB[testName + "_with_default"];
+ const withoutDefaultCollationColl = mongosDB[testName + "_without_default"];
+
+ assert.commandWorked(
+ mongosDB.createCollection(withDefaultCollationColl.getName(), caseInsensitive));
+ assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
+
+ assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"}));
+ assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"}));
+ assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"}));
+
+ //
+ // Sharded collection with default collation and unsharded collection without a default
+ // collation.
+ //
+ assert.commandWorked(
+ withDefaultCollationColl.createIndex({str: 1}, {collation: {locale: "simple"}}));
+
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ // Shard the collection with a default collation.
+ assert.commandWorked(mongosDB.adminCommand({
+ shardCollection: withDefaultCollationColl.getFullName(),
+ key: {str: 1},
+ collation: {locale: "simple"}
+ }));
+
+ // Split the collection into 2 chunks.
+ assert.commandWorked(mongosDB.adminCommand(
+ {split: withDefaultCollationColl.getFullName(), middle: {str: "abc"}}));
+
+ // Move the chunk containing {str: "abc"} to shard0001.
+ assert.commandWorked(mongosDB.adminCommand({
+ moveChunk: withDefaultCollationColl.getFullName(),
+ find: {str: "abc"},
+ to: st.shard1.shardName
+ }));
+
+ runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive);
+
+ // TODO: Enable the following tests once SERVER-32536 is fixed.
+ //
+ // Sharded collection with default collation and sharded collection without a default
+ // collation.
+ //
+
+ // Shard the collection without a default collation.
+ // assert.commandWorked(mongosDB.adminCommand({
+ // shardCollection: withoutDefaultCollationColl.getFullName(),
+ // key: {_id: 1},
+ // }));
+
+ // // Split the collection into 2 chunks.
+ // assert.commandWorked(mongosDB.adminCommand(
+ // {split: withoutDefaultCollationColl.getFullName(), middle: {_id: "unmatched"}}));
+
+ // // Move the chunk containing {_id: "lowercase"} to shard0001.
+ // assert.commandWorked(mongosDB.adminCommand({
+ // moveChunk: withoutDefaultCollationColl.getFullName(),
+ // find: {_id: "lowercase"},
+ // to: st.shard1.shardName
+ // }));
+
+ // runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup.js b/jstests/sharding/lookup.js
new file mode 100644
index 00000000000..cc41cbff319
--- /dev/null
+++ b/jstests/sharding/lookup.js
@@ -0,0 +1,609 @@
+// Basic $lookup regression tests.
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const st = new ShardingTest({shards: 2, config: 1, mongos: 1});
+ const testName = "lookup_sharded";
+
+ const mongosDB = st.s0.getDB(testName);
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Used by testPipeline to sort result documents. All _ids must be primitives.
+ function compareId(a, b) {
+ if (a._id < b._id) {
+ return -1;
+ }
+ if (a._id > b._id) {
+ return 1;
+ }
+ return 0;
+ }
+
+ // Helper for testing that pipeline returns correct set of results.
+ function testPipeline(pipeline, expectedResult, collection) {
+ assert.eq(collection.aggregate(pipeline).toArray().sort(compareId),
+ expectedResult.sort(compareId));
+ }
+
+ // Shards and splits the collection 'coll' on _id.
+ function shardAndSplit(db, coll) {
+ // Shard the collection on _id.
+ assert.commandWorked(db.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
+ assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey) chunk to shard0001.
+ assert.commandWorked(db.adminCommand({
+ moveChunk: coll.getFullName(),
+ find: {_id: 1},
+ to: st.shard1.shardName,
+ }));
+ }
+
+ function runTest(coll, from, thirdColl, fourthColl) {
+ let db = null; // Using the db variable is banned in this function.
+
+ coll.remove({});
+ from.remove({});
+ thirdColl.remove({});
+ fourthColl.remove({});
+
+ assert.writeOK(coll.insert({_id: 0, a: 1}));
+ assert.writeOK(coll.insert({_id: 1, a: null}));
+ assert.writeOK(coll.insert({_id: 2}));
+
+ assert.writeOK(from.insert({_id: 0, b: 1}));
+ assert.writeOK(from.insert({_id: 1, b: null}));
+ assert.writeOK(from.insert({_id: 2}));
+ //
+ // Basic functionality.
+ //
+
+ // "from" document added to "as" field if a == b, where nonexistent fields are treated as
+ // null.
+ let expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If localField is nonexistent, it is treated as if it is null.
+ expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If foreignField is nonexistent, it is treated as if it is null.
+ expectedResults = [
+ {_id: 0, a: 1, "same": []},
+ {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If there are no matches or the from coll doesn't exist, the result is an empty array.
+ expectedResults =
+ [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}];
+ testPipeline(
+ [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}],
+ expectedResults,
+ coll);
+ testPipeline(
+ [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}],
+ expectedResults,
+ coll);
+
+ // If field name specified by "as" already exists, it is overwritten.
+ expectedResults = [
+ {_id: 0, "a": [{_id: 0, b: 1}]},
+ {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}],
+ expectedResults,
+ coll);
+
+ // Running multiple $lookups in the same pipeline is allowed.
+ expectedResults = [
+ {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]},
+ {
+ _id: 1,
+ a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]
+ },
+ {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}},
+ {$project: {"a": 1, "c": 1}},
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}}
+ ],
+ expectedResults,
+ coll);
+
+ //
+ // Coalescing with $unwind.
+ //
+
+ // A normal $unwind with on the "as" field.
+ expectedResults = [
+ {_id: 0, a: 1, same: {_id: 0, b: 1}},
+ {_id: 1, a: null, same: {_id: 1, b: null}},
+ {_id: 1, a: null, same: {_id: 2}},
+ {_id: 2, same: {_id: 1, b: null}},
+ {_id: 2, same: {_id: 2}}
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
+ {$unwind: {path: "$same"}}
+ ],
+ expectedResults,
+ coll);
+
+ // An $unwind on the "as" field, with includeArrayIndex.
+ expectedResults = [
+ {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)},
+ {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)},
+ {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)},
+ {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)},
+ {_id: 2, same: {_id: 2}, index: NumberLong(1)},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
+ {$unwind: {path: "$same", includeArrayIndex: "index"}}
+ ],
+ expectedResults,
+ coll);
+
+ // Normal $unwind with no matching documents.
+ expectedResults = [];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
+ {$unwind: {path: "$same"}}
+ ],
+ expectedResults,
+ coll);
+
+ // $unwind with preserveNullAndEmptyArray with no matching documents.
+ expectedResults = [
+ {_id: 0, a: 1},
+ {_id: 1, a: null},
+ {_id: 2},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}},
+ {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
+ ],
+ expectedResults,
+ coll);
+
+ // $unwind with preserveNullAndEmptyArray, some with matching documents, some without.
+ expectedResults = [
+ {_id: 0, a: 1},
+ {_id: 1, a: null, same: {_id: 0, b: 1}},
+ {_id: 2},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
+ {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}}
+ ],
+ expectedResults,
+ coll);
+
+ // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching
+ // documents, some without.
+ expectedResults = [
+ {_id: 0, a: 1, index: null},
+ {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)},
+ {_id: 2, index: null},
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}},
+ {
+ $unwind:
+ {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"}
+ }
+ ],
+ expectedResults,
+ coll);
+
+ //
+ // Dependencies.
+ //
+
+ // If $lookup didn't add "localField" to its dependencies, this test would fail as the
+ // value of the "a" field would be lost and treated as null.
+ expectedResults = [
+ {_id: 0, "same": [{_id: 0, b: 1}]},
+ {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+ testPipeline(
+ [
+ {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}},
+ {$project: {"same": 1}}
+ ],
+ expectedResults,
+ coll);
+
+ // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test
+ // would fail as the value of the "a" field would be lost and treated as null.
+ expectedResults = [
+ {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]},
+ {
+ "_id": 1,
+ "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}]
+ },
+ {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]}
+ ];
+ testPipeline(
+ [
+ {
+ $lookup: {
+ let : {var1: "$a"},
+ pipeline: [{$project: {x: "$$var1"}}],
+ from: "from",
+ as: "same"
+ }
+ },
+ {$project: {"same": 1}}
+ ],
+ expectedResults,
+ coll);
+
+ //
+ // Dotted field paths.
+ //
+
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: 1}));
+ assert.writeOK(coll.insert({_id: 1, a: null}));
+ assert.writeOK(coll.insert({_id: 2}));
+ assert.writeOK(coll.insert({_id: 3, a: {c: 1}}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0, b: 1}));
+ assert.writeOK(from.insert({_id: 1, b: null}));
+ assert.writeOK(from.insert({_id: 2}));
+ assert.writeOK(from.insert({_id: 3, b: {c: 1}}));
+ assert.writeOK(from.insert({_id: 4, b: {c: 2}}));
+
+ // Once without a dotted field.
+ let pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}];
+ expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Look up a dotted field.
+ pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}];
+ // All but the last document in 'coll' have a nullish value for 'a.c'.
+ expectedResults = [
+ {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]},
+ {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // With an $unwind stage.
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: {b: 1}}));
+ assert.writeOK(coll.insert({_id: 1}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0, target: 1}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a.b",
+ foreignField: "target",
+ from: "from",
+ as: "same.documents",
+ }
+ },
+ {
+ // Expected input to $unwind:
+ // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}}
+ // {_id: 1, same: {documents: []}}
+ $unwind: {
+ path: "$same.documents",
+ preserveNullAndEmptyArrays: true,
+ includeArrayIndex: "c.d.e",
+ }
+ }
+ ];
+ expectedResults = [
+ {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}},
+ {_id: 1, same: {}, c: {d: {e: null}}},
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Query-like local fields (SERVER-21287)
+ //
+
+ // This must only do an equality match rather than treating the value as a regex.
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: /a regex/}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0, b: /a regex/}));
+ assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "b",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // A local value of an array.
+ //
+
+ // Basic array corresponding to multiple documents.
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0}));
+ assert.writeOK(from.insert({_id: 1}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "_id",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Basic array corresponding to a single document.
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: [1]}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0}));
+ assert.writeOK(from.insert({_id: 1}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "_id",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // Array containing regular expressions.
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]}));
+ assert.writeOK(coll.insert({_id: 1, a: [/^x/]}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0, b: "should not match a regex"}));
+ assert.writeOK(from.insert({_id: 1, b: "xxxx"}));
+ assert.writeOK(from.insert({_id: 2, b: /a regex/}));
+ assert.writeOK(from.insert({_id: 3, b: /^x/}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a",
+ foreignField: "b",
+ from: "from",
+ as: "b",
+ }
+ },
+ ];
+ expectedResults = [
+ {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]},
+ {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]}
+ ];
+ testPipeline(pipeline, expectedResults, coll);
+
+ // 'localField' references a field within an array of sub-objects.
+ coll.remove({});
+ assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]}));
+
+ from.remove({});
+ assert.writeOK(from.insert({_id: 0}));
+ assert.writeOK(from.insert({_id: 1}));
+ assert.writeOK(from.insert({_id: 2}));
+ assert.writeOK(from.insert({_id: 3}));
+
+ pipeline = [
+ {
+ $lookup: {
+ localField: "a.b",
+ foreignField: "_id",
+ from: "from",
+ as: "c",
+ }
+ },
+ ];
+
+ expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}];
+ testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Test $lookup when the foreign collection is a view.
+ //
+ // TODO: Enable this test as part of SERVER-32548, fails whenever the foreign collection is
+ // sharded.
+ // coll.getDB().fromView.drop();
+ // assert.commandWorked(
+ // coll.getDB().runCommand({create: "fromView", viewOn: "from", pipeline: []}));
+
+ // pipeline = [
+ // {
+ // $lookup: {
+ // localField: "a.b",
+ // foreignField: "_id",
+ // from: "fromView",
+ // as: "c",
+ // }
+ // },
+ // ];
+
+ // expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}];
+ // testPipeline(pipeline, expectedResults, coll);
+
+ //
+ // Error cases.
+ //
+
+ // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with
+ // localField/foreignField syntax.
+ assertErrorCode(coll,
+ [{$lookup: {foreignField: "b", from: "from", as: "same"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", from: "from", as: "same"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", as: "same"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: "from"}}],
+ ErrorCodes.FailedToParse);
+
+ // localField/foreignField and pipeline/let syntax must not be mixed.
+ assertErrorCode(coll,
+ [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(
+ coll,
+ [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(
+ coll,
+ [{
+ $lookup:
+ {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"}
+ }],
+ ErrorCodes.FailedToParse);
+
+ // 'from', 'as', 'localField' and 'foreignField' must all be of type string.
+ assertErrorCode(coll,
+ [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}],
+ ErrorCodes.FailedToParse);
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}],
+ ErrorCodes.FailedToParse);
+
+ // The foreign collection must be a valid namespace.
+ assertErrorCode(coll,
+ [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}],
+ ErrorCodes.InvalidNamespace);
+ // $lookup's field must be an object.
+ assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse);
+ }
+
+ //
+ // Test unsharded local collection and unsharded foreign collection.
+ //
+ mongosDB.lookUp.drop();
+ mongosDB.from.drop();
+ mongosDB.thirdColl.drop();
+ mongosDB.fourthColl.drop();
+
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ // Verify that the command is sent only to the primary shard when both the local and foreign
+ // collections are unsharded.
+ assert(!assert
+ .commandWorked(mongosDB.lookup.explain().aggregate([{
+ $lookup: {
+ from: mongosDB.from.getName(),
+ localField: "a",
+ foreignField: "b",
+ as: "results"
+ }
+ }]))
+ .hasOwnProperty("shards"));
+ // Enable sharding on the test DB and ensure its primary is shard0000.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ //
+ // Test unsharded local collection and sharded foreign collection.
+ //
+
+ // Shard the foreign collection on _id.
+ shardAndSplit(mongosDB, mongosDB.from);
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ //
+ // Test sharded local collection and unsharded foreign collection.
+ //
+ mongosDB.from.drop();
+
+ // Shard the local collection on _id.
+ shardAndSplit(mongosDB, mongosDB.lookup);
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ //
+ // Test sharded local and foreign collections.
+ //
+
+ // Shard the foreign collection on _id.
+ shardAndSplit(mongosDB, mongosDB.from);
+ runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl);
+
+ st.stop();
+}());
diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js
new file mode 100644
index 00000000000..6333eec15de
--- /dev/null
+++ b/jstests/sharding/lookup_mongod_unaware.js
@@ -0,0 +1,168 @@
+// Tests the behavior of a $lookup when a shard contains incorrect routing information for the
+// local and/or foreign collections. This includes when the shard thinks the collection is sharded
+// when it's not, and likewise when it thinks the collection is unsharded but is actually sharded.
+//
+// We restart a mongod to cause it to forget that a collection was sharded. When restarted, we
+// expect it to still have all the previous data.
+// @tags: [requires_persistence]
+(function() {
+ "use strict";
+
+ // Restarts the primary shard and ensures that it believes both collections are unsharded.
+ function restartPrimaryShard(rs, localColl, foreignColl) {
+ // Returns true if the shard is aware that the collection is sharded.
+ function hasRoutingInfoForNs(shardConn, coll) {
+ const res = shardConn.adminCommand({getShardVersion: coll, fullMetadata: true});
+ assert.commandWorked(res);
+ return res.metadata.collVersion != undefined;
+ }
+
+ rs.restart(0);
+ rs.awaitSecondaryNodes();
+ assert(!hasRoutingInfoForNs(rs.getPrimary(), localColl.getFullName()));
+ assert(!hasRoutingInfoForNs(rs.getPrimary(), foreignColl.getFullName()));
+ }
+
+ const testName = "lookup_stale_mongod";
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 2,
+ rs: {nodes: 1},
+ });
+
+ const mongos0DB = st.s0.getDB(testName);
+ const mongos0LocalColl = mongos0DB[testName + "_local"];
+ const mongos0ForeignColl = mongos0DB[testName + "_foreign"];
+
+ const mongos1DB = st.s1.getDB(testName);
+ const mongos1LocalColl = mongos1DB[testName + "_local"];
+ const mongos1ForeignColl = mongos1DB[testName + "_foreign"];
+
+ // Ensure that shard0 is the primary shard.
+ assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()}));
+ st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName);
+
+ assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null}));
+
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+
+ // Send writes through mongos1 such that it's aware of the collections and believes they are
+ // unsharded.
+ assert.writeOK(mongos1LocalColl.insert({_id: 2}));
+ assert.writeOK(mongos1ForeignColl.insert({_id: 2}));
+
+ const pipeline = [
+ {
+ $lookup:
+ {localField: "a", foreignField: "b", from: mongos0ForeignColl.getName(), as: "same"}
+ },
+ {$sort: {_id: 1}}
+ ];
+
+ // The results are expected to be correct if the $lookup stage is executed on the mongos which
+ // is aware that the collection is sharded.
+ const expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+
+ //
+ // Test unsharded local and sharded foreign collections, with the primary shard unaware that
+ // the foreign collection is sharded.
+ //
+
+ // Shard the foreign collection.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0ForeignColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Verify $lookup results through the fresh mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ // Verify $lookup results through mongos1, which is not aware that the local
+ // collection is sharded. The results are expected to be incorrect when both the mongos and
+ // primary shard incorrectly believe that a collection is unsharded.
+ // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file
+ // sent to the stale mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
+ {_id: 0, a: 1, "same": []},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ]);
+
+ //
+ // Test sharded local and sharded foreign collections, with the primary shard unaware that
+ // either collection is sharded.
+ //
+
+ // Shard the local collection.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0LocalColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Verify $lookup results through the fresh mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ // Verify $lookup results through mongos1, which is not aware that the local
+ // collection is sharded. The results are expected to be incorrect when both the mongos and
+ // primary shard incorrectly believe that a collection is unsharded.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ]);
+
+ //
+ // Test sharded local and unsharded foreign collections, with the primary shard unaware that
+ // the local collection is sharded.
+ //
+
+ // Recreate the foreign collection as unsharded.
+ mongos0ForeignColl.drop();
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 2}));
+
+ // Verify $lookup results through the fresh mongos.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ // Verify $lookup results through mongos1, which is not aware that the local
+ // collection is sharded. The results are expected to be incorrect when both the mongos and
+ // primary shard incorrectly believe that a collection is unsharded.
+ restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl);
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ]);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/lookup_stale_mongos.js b/jstests/sharding/lookup_stale_mongos.js
new file mode 100644
index 00000000000..3c713733b49
--- /dev/null
+++ b/jstests/sharding/lookup_stale_mongos.js
@@ -0,0 +1,130 @@
+// Tests the behavior of a $lookup when the mongos contains stale routing information for the
+// local and/or foreign collections. This includes when mongos thinks the collection is sharded
+// when it's not, and likewise when mongos thinks the collection is unsharded but is actually
+// sharded.
+(function() {
+ "use strict";
+
+ const testName = "lookup_stale_mongos";
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 2,
+ });
+
+ const mongos0DB = st.s0.getDB(testName);
+ assert.commandWorked(mongos0DB.dropDatabase());
+ const mongos0LocalColl = mongos0DB[testName + "_local"];
+ const mongos0ForeignColl = mongos0DB[testName + "_foreign"];
+
+ const mongos1DB = st.s1.getDB(testName);
+ const mongos1LocalColl = mongos1DB[testName + "_local"];
+ const mongos1ForeignColl = mongos1DB[testName + "_foreign"];
+
+ const pipeline = [
+ {
+ $lookup:
+ {localField: "a", foreignField: "b", from: mongos1ForeignColl.getName(), as: "same"}
+ },
+ {$sort: {_id: 1}}
+ ];
+ const expectedResults = [
+ {_id: 0, a: 1, "same": [{_id: 0, b: 1}]},
+ {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]},
+ {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}
+ ];
+
+ // Ensure that shard0 is the primary shard.
+ assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()}));
+ st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName);
+
+ assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null}));
+
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+
+ // Send writes through mongos1 such that it's aware of the collections and believes they are
+ // unsharded.
+ assert.writeOK(mongos1LocalColl.insert({_id: 2}));
+ assert.writeOK(mongos1ForeignColl.insert({_id: 2}));
+
+ //
+ // Test unsharded local and sharded foreign collections, with mongos unaware that the foreign
+ // collection is sharded.
+ //
+
+ // Shard the foreign collection through mongos0.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0ForeignColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Issue a $lookup through mongos1, which is unaware that the foreign collection is sharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ //
+ // Test sharded local and sharded foreign collections, with mongos unaware that the local
+ // collection is sharded.
+ //
+
+ // Shard the local collection through mongos0.
+ assert.commandWorked(
+ mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey).
+ assert.commandWorked(
+ mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}}));
+
+ // Move the [minKey, 1) chunk to shard1.
+ assert.commandWorked(mongos0DB.adminCommand({
+ moveChunk: mongos0LocalColl.getFullName(),
+ find: {_id: 0},
+ to: st.shard1.shardName,
+ _waitForDelete: true
+ }));
+
+ // Issue a $lookup through mongos1, which is unaware that the local collection is sharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ //
+ // Test sharded local and unsharded foreign collections, with mongos unaware that the foreign
+ // collection is unsharded.
+ //
+
+ // Recreate the foreign collection as unsharded through mongos0.
+ mongos0ForeignColl.drop();
+ assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null}));
+ assert.writeOK(mongos0ForeignColl.insert({_id: 2}));
+
+ // Issue a $lookup through mongos1, which is unaware that the foreign collection is now
+ // unsharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ //
+ // Test unsharded local and foreign collections, with mongos unaware that the local
+ // collection is unsharded.
+ //
+
+ // Recreate the foreign collection as unsharded through mongos0.
+ mongos0LocalColl.drop();
+ assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null}));
+ assert.writeOK(mongos0LocalColl.insert({_id: 2}));
+
+ // Issue a $lookup through mongos1, which is unaware that the local collection is now
+ // unsharded.
+ assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults);
+
+ st.stop();
+})();
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index cf1267eeb03..d975895dc4d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -218,7 +218,7 @@ public:
MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults)
: _mockResults(std::move(mockResults)) {}
- bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final {
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
@@ -236,16 +236,16 @@ public:
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index 05a62606bb6..6db48d43850 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -75,16 +75,16 @@ public:
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
pipeline->addInitialSource(DocumentSourceMock::create(_results));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index e4fd70920f6..ecc9d572bfe 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -283,8 +283,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
- uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- _fromExpCtx, pipeline.get()));
+ pipeline = uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ _fromExpCtx, pipeline.release()));
}
// If the cache has been abandoned, release it.
@@ -614,6 +614,39 @@ void DocumentSourceLookUp::initializeIntrospectionPipeline() {
sources.empty() || !sources.front()->constraints().isChangeStreamStage());
}
+DocumentSource::StageConstraints DocumentSourceLookUp::constraints(
+ Pipeline::SplitState pipeState) const {
+
+ const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
+ std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
+ _parsedIntrospectionPipeline->getSources().end(),
+ [](const auto& source) {
+ return source->constraints().diskRequirement ==
+ DiskUseRequirement::kWritesTmpData;
+ });
+
+ // If executing on mongos and the foreign collection is sharded, then this stage can run on
+ // mongos.
+ HostTypeRequirement hostReq;
+ if (pExpCtx->inMongos) {
+ hostReq = pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)
+ ? HostTypeRequirement::kMongoS
+ : HostTypeRequirement::kPrimaryShard;
+ } else {
+ hostReq = HostTypeRequirement::kPrimaryShard;
+ }
+
+ StageConstraints constraints(StreamType::kStreaming,
+ PositionRequirement::kNone,
+ hostReq,
+ mayUseDisk ? DiskUseRequirement::kWritesTmpData
+ : DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed);
+
+ constraints.canSwapWithMatch = true;
+ return constraints;
+}
+
void DocumentSourceLookUp::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
Document doc;
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 530c62f985c..7424b2ef97d 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -96,25 +96,7 @@ public:
*/
GetModPathsReturn getModifiedPaths() const final;
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- const bool mayUseDisk = wasConstructedWithPipelineSyntax() &&
- std::any_of(_parsedIntrospectionPipeline->getSources().begin(),
- _parsedIntrospectionPipeline->getSources().end(),
- [](const auto& source) {
- return source->constraints().diskRequirement ==
- DiskUseRequirement::kWritesTmpData;
- });
-
- StageConstraints constraints(StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kPrimaryShard,
- mayUseDisk ? DiskUseRequirement::kWritesTmpData
- : DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
-
- constraints.canSwapWithMatch = true;
- return constraints;
- }
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final;
GetDepsReturn getDependencies(DepsTracker* deps) const final;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index 175806d5f7e..77feab2f825 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -101,16 +101,16 @@ public:
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
boost::optional<Document> lookupSingleDocument(
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index dfe4ece48ac..b77c549a855 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -511,14 +511,14 @@ public:
}
if (opts.attachCursorSource) {
- uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()));
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
}
return pipeline;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final {
while (_removeLeadingQueryStages && !pipeline->getSources().empty()) {
if (pipeline->popFrontWithCriteria("$match") ||
pipeline->popFrontWithCriteria("$sort") ||
@@ -529,7 +529,7 @@ public:
}
pipeline->addInitialSource(DocumentSourceMock::create(_mockResults));
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
private:
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index d32e8276c63..7d78c880d1b 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -156,12 +156,16 @@ public:
const MakePipelineOptions opts = MakePipelineOptions{}) = 0;
/**
- * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This
- * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be
+ * Accepts a pipeline and returns a new one which will draw input from the underlying
+ * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be
* returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be
* the only case where NamespaceNotFound is returned.
+ *
+ * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr.
+ * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
+ * compiler expects to find an implementation of PipelineDeleter.
*/
- virtual Status attachCursorSourceToPipeline(
+ virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0;
/**
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 7ce395f3716..16d6d9e325c 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -585,8 +585,8 @@ DBClientBase* PipelineD::MongoDInterface::directClient() {
}
bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- AutoGetCollectionForReadCommand autoColl(opCtx, nss);
- // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding
+ AutoGetCollectionForRead autoColl(opCtx, nss);
+ // TODO SERVER-32198: Use CollectionShardingState::collectionIsSharded() to confirm sharding
// state.
auto css = CollectionShardingState::get(opCtx, nss);
return bool(css->getMetadata());
@@ -689,16 +689,15 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterfac
pipeline.getValue()->optimizePipeline();
}
- Status cursorStatus = Status::OK();
-
if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
}
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+ return pipeline;
}
-Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>>
+PipelineD::MongoDInterface::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
@@ -729,7 +728,7 @@ Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
- return Status::OK();
+ return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx));
}
std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps(
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index afbb6b8f73f..f626b0f904b 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -95,8 +95,8 @@ public:
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{}) final;
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final;
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
CurrentOpConnectionsMode connMode,
CurrentOpUserMode userMode,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 0ec37b15eb4..488cf97c58e 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -108,8 +108,8 @@ public:
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) override {
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index c90346763d0..60fe5bfbb11 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -62,43 +62,13 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/establish_cursors.h"
-#include "mongo/s/query/router_stage_update_on_add_shard.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
-#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
namespace mongo {
-MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors);
-
namespace {
-// Given a document representing an aggregation command such as
-//
-// {aggregate: "myCollection", pipeline: [], ...},
-//
-// produces the corresponding explain command:
-//
-// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
-Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
- MutableDocument explainCommandBuilder;
- explainCommandBuilder["explain"] = Value(aggregateCommand);
- // Downstream host targeting code expects queryOptions at the top level of the command object.
- explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
- Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
-
- // readConcern needs to be promoted to the top-level of the request.
- explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
- Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
-
- // Add explain command options.
- for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
- explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
- }
-
- return explainCommandBuilder.freeze();
-}
-
Status appendExplainResults(
const std::vector<AsyncRequestsSender::Response>& shardResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
@@ -144,15 +114,6 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
return getStatusFromCommandResult(result->asTempObj());
}
-bool mustRunOnAllShards(const NamespaceString& nss,
- const CachedCollectionRoutingInfo& routingInfo,
- const LiteParsedPipeline& litePipe) {
- // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection
- // must run on all shards.
- const bool nsIsSharded = static_cast<bool>(routingInfo.cm());
- return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream());
-}
-
StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx,
const NamespaceString& execNss,
CatalogCache* catalogCache) {
@@ -174,65 +135,6 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte
return swRoutingInfo;
}
-std::set<ShardId> getTargetedShards(OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- const CachedCollectionRoutingInfo& routingInfo,
- const BSONObj shardQuery,
- const BSONObj collation) {
- if (mustRunOnAllShards(nss, routingInfo, litePipe)) {
- // The pipeline begins with a stage which must be run on all shards.
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
- return {shardIds.begin(), shardIds.end()};
- }
-
- if (routingInfo.cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation.
- std::set<ShardId> shardIds;
- routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds);
- return shardIds;
- }
-
- // The collection is unsharded. Target only the primary shard for the database.
- return {routingInfo.primaryId()};
-}
-
-BSONObj createCommandForTargetedShards(
- const AggregationRequest& request,
- const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) {
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
-
- // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
- if (pipelineForTargetedShards) {
- targetedCmd[AggregationRequest::kPipelineName] =
- Value(pipelineForTargetedShards->serialize());
-
- if (pipelineForTargetedShards->isSplitForShards()) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
- }
- }
-
- // If this pipeline is not split, ensure that the write concern is propagated if present.
- if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) {
- targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
- }
-
- // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
- // explain command.
- if (auto explainVerbosity = request.getExplain()) {
- targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
- }
-
- return targetedCmd.freeze().toBson();
-}
-
BSONObj createCommandForMergingShard(
const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
@@ -255,247 +157,6 @@ BSONObj createCommandForMergingShard(
return mergeCmd.freeze().toBson();
}
-StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
- LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
-
- std::set<ShardId> shardIds =
- getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation);
- std::vector<std::pair<ShardId, BSONObj>> requests;
-
- if (mustRunOnAllShards(nss, *routingInfo, litePipe)) {
- // The pipeline contains a stage which must be run on all shards. Skip versioning and
- // enqueue the raw command objects.
- for (auto&& shardId : shardIds) {
- requests.emplace_back(std::move(shardId), cmdObj);
- }
- } else if (routingInfo->cm()) {
- // The collection is sharded. Use the routing table to decide which shards to target
- // based on the query and collation, and build versioned requests for them.
- for (auto& shardId : shardIds) {
- auto versionedCmdObj =
- appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
- requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
- }
- } else {
- // The collection is unsharded. Target only the primary shard for the database.
- // Don't append shard version info when contacting the config servers.
- requests.emplace_back(routingInfo->primaryId(),
- !routingInfo->primary()->isConfig()
- ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
- : cmdObj);
- }
-
- if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
- "until fail point is disabled.";
- while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
- sleepsecs(1);
- }
- }
-
- // If we reach this point, we're either trying to establish cursors on a sharded execution
- // namespace, or handling the case where a sharded collection was dropped and recreated as
- // unsharded. Since views cannot be sharded, and because we will return an error rather than
- // attempting to continue in the event that a recreated namespace is a view, we set
- // viewDefinitionOut to nullptr.
- BSONObj* viewDefinitionOut = nullptr;
- auto swCursors = establishCursors(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- readPref,
- requests,
- false /* do not allow partial results */,
- viewDefinitionOut /* can't receive view definition */);
-
- // If any shard returned a stale shardVersion error, invalidate the routing table cache.
- // This will cause the cache to be refreshed the next time it is accessed.
- if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
- Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo));
- }
-
- return swCursors;
-}
-
-struct DispatchShardPipelineResults {
- // True if this pipeline was split, and the second half of the pipeline needs to be run on the
- // primary shard for the database.
- bool needsPrimaryShardMerge;
-
- // Populated if this *is not* an explain, this vector represents the cursors on the remote
- // shards.
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
-
- // Populated if this *is* an explain, this vector represents the results from each shard.
- std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
-
- // The half of the pipeline that was sent to each shard, or the entire pipeline if there was
- // only one shard targeted.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
-
- // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
-
- // The command object to send to the targeted shards.
- BSONObj commandForTargetedShards;
-};
-
-/**
- * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
- * the pipeline that will need to be executed to merge the results from the remotes. If a stale
- * shard version is encountered, refreshes the routing table and tries again.
- */
-StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& executionNss,
- BSONObj originalCmdObj,
- const AggregationRequest& aggRequest,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
- // The process is as follows:
- // - First, determine whether we need to target more than one shard. If so, we split the
- // pipeline; if not, we retain the existing pipeline.
- // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
- // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
- // the refreshed routing table data.
- // - If the pipeline is not split and we now need to target multiple shards, split it. If the
- // pipeline is already split and we now only need to target a single shard, reassemble the
- // original pipeline.
- // - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
- auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>();
- auto opCtx = expCtx->opCtx;
-
- const bool needsPrimaryShardMerge =
- (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
-
- const bool needsMongosMerge = pipeline->needsMongosMerger();
-
- const auto shardQuery = pipeline->getInitialQuery();
-
- auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
- BSONObj targetedCommand;
-
- int numAttempts = 0;
-
- do {
- // We need to grab a new routing table at the start of each iteration, since a stale config
- // exception will invalidate the previous one.
- auto executionNsRoutingInfo = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
-
- // Determine whether we can run the entire aggregation on a single shard.
- std::set<ShardId> shardIds = getTargetedShards(opCtx,
- executionNss,
- liteParsedPipeline,
- executionNsRoutingInfo,
- shardQuery,
- aggRequest.getCollation());
-
- uassert(ErrorCodes::ShardNotFound,
- "No targets were found for this aggregation. All shards were removed from the "
- "cluster mid-operation",
- shardIds.size() > 0);
-
- // Don't need to split the pipeline if we are only targeting a single shard, unless:
- // - There is a stage that needs to be run on the primary shard and the single target shard
- // is not the primary.
- // - The pipeline contains one or more stages which must always merge on mongoS.
- const bool needsSplit =
- (shardIds.size() > 1u || needsMongosMerge ||
- (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId()));
-
- const bool isSplit = pipelineForTargetedShards->isSplitForShards();
-
- // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
- // can run on a single shard and the pipeline is already split, reassemble it.
- if (needsSplit && !isSplit) {
- pipelineForMerging = std::move(pipelineForTargetedShards);
- pipelineForTargetedShards = pipelineForMerging->splitForSharded();
- } else if (!needsSplit && isSplit) {
- pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
- }
-
- // Generate the command object for the targeted shards.
- targetedCommand =
- createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards);
-
- // Refresh the shard registry if we're targeting all shards. We need the shard registry
- // to be at least as current as the logical time used when creating the command for
- // $changeStream to work reliably, so we do a "hard" reload.
- if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
- shardRegistry->reload(opCtx);
- }
- }
-
- // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
- if (expCtx->explain) {
- if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
- // Some stages (such as $currentOp) need to be broadcast to all shards, and should
- // not participate in the shard version protocol.
- swShardResults =
- scatterGatherUnversionedTargetAllShards(opCtx,
- executionNss.db().toString(),
- executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- } else {
- // Aggregations on a real namespace should use the routing table to target shards,
- // and should participate in the shard version protocol.
- swShardResults =
- scatterGatherVersionedTargetByRoutingTable(opCtx,
- executionNss.db().toString(),
- executionNss,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- shardQuery,
- aggRequest.getCollation(),
- nullptr /* viewDefinition */);
- }
- } else {
- swCursors = establishShardCursors(opCtx,
- executionNss,
- liteParsedPipeline,
- &executionNsRoutingInfo,
- targetedCommand,
- ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
-
- if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
- LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
- << " while dispatching " << redact(targetedCommand) << " after "
- << (numAttempts + 1) << " dispatch attempts";
- }
- }
- } while (++numAttempts < kMaxNumStaleVersionRetries &&
- (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK()));
-
- if (!swShardResults.isOK()) {
- return swShardResults.getStatus();
- }
- if (!swCursors.isOK()) {
- return swCursors.getStatus();
- }
- return DispatchShardPipelineResults{needsPrimaryShardMerge,
- std::move(swCursors.getValue()),
- std::move(swShardResults.getValue()),
- std::move(pipelineForTargetedShards),
- std::move(pipelineForMerging),
- targetedCommand};
-}
StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor(
OperationContext* opCtx,
@@ -520,104 +181,6 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs
return {{std::move(mergingShardId), std::move(shardCmdResponse)}};
}
-BSONObj establishMergingMongosCursor(OperationContext* opCtx,
- const AggregationRequest& request,
- const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
-
- ClusterClientCursorParams params(
- requestedNss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- ReadPreferenceSetting::get(opCtx));
-
- params.tailableMode = pipelineForMerging->getContext()->tailableMode;
- params.mergePipeline = std::move(pipelineForMerging);
- params.remotes = std::move(cursors);
-
- // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
- // size we pass here is used for getMores, so do not specify a batch size if the initial request
- // had a batch size of 0.
- params.batchSize = request.getBatchSize() == 0
- ? boost::none
- : boost::optional<long long>(request.getBatchSize());
-
- if (liteParsedPipeline.hasChangeStream()) {
- // For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added.
- params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
- return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, cmdToRunOnNewShards);
- };
- }
- auto ccc = ClusterClientCursorImpl::make(
- opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
-
- auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
- BSONObjBuilder cursorResponse;
-
- CursorResponseBuilder responseBuilder(true, &cursorResponse);
-
- for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
- ClusterQueryResult next;
- try {
- next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
- } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
- // This exception is thrown when a $changeStream stage encounters an event
- // that invalidates the cursor. We should close the cursor and return without
- // error.
- cursorState = ClusterCursorManager::CursorState::Exhausted;
- break;
- }
-
- // Check whether we have exhausted the pipeline's results.
- if (next.isEOF()) {
- // We reached end-of-stream. If the cursor is not tailable, then we mark it as
- // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
- // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
- // hope of returning data and thus we need to close the mongos cursor as well.
- if (!ccc->isTailable() || ccc->remotesExhausted()) {
- cursorState = ClusterCursorManager::CursorState::Exhausted;
- }
- break;
- }
-
- // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
- // to be returned on the next getMore.
- auto nextObj = *next.getResult();
-
- if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
- ccc->queueResult(nextObj);
- break;
- }
-
- responseBuilder.append(nextObj);
- }
-
- ccc->detachFromOperationContext();
-
- CursorId clusterCursorId = 0;
-
- if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
- clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
- opCtx,
- ccc.releaseCursor(),
- requestedNss,
- ClusterCursorManager::CursorType::MultiTarget,
- ClusterCursorManager::CursorLifetime::Mortal));
- }
-
- responseBuilder.done(clusterCursorId, requestedNss.ns());
-
- Command::appendCommandStatus(cursorResponse, Status::OK());
-
- return cursorResponse.obj();
-}
-
BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard,
const NamespaceString& nss) {
ScopedDbConnection conn(primaryShard->getConnString());
@@ -653,9 +216,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
BSONObj cmdObj,
BSONObjBuilder* result) {
const auto catalogCache = Grid::get(opCtx)->catalogCache();
+ auto executionNss = namespaces.executionNss;
auto executionNsRoutingInfoStatus =
- getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache);
+ getExecutionNsRoutingInfo(opCtx, executionNss, catalogCache);
LiteParsedPipeline liteParsedPipeline(request);
@@ -677,29 +241,33 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Determine the appropriate collation and 'resolve' involved namespaces to make the
// ExpressionContext.
- // We won't try to execute anything on a mongos, but we still have to populate this map so that
- // any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
- // incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we
- // need to check if any involved collections are sharded before forwarding an aggregation
- // command on an unsharded collection.
+ // We may not try to execute anything on mongos, but we still have to populate this map so that
+ // any $lookups, etc. will be able to have a resolved view definition when they are parsed.
+ // TODO: SERVER-32548 will add support for lookup against a sharded view, so this map needs to
+ // be correct to determine whether the aggregate should be passthrough or sent to all shards.
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ bool involvesShardedCollections = false;
for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
- uassert(
- 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm());
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
+ if (resolvedNsRoutingInfo.cm()) {
+ involvesShardedCollections = true;
+ }
}
- // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does
- // not need to run on all shards, and doesn't need transformation via
- // DocumentSource::serialize(), then go ahead and pass it through to the owning shard
- // unmodified.
+ // A pipeline is allowed to passthrough to the primary shard iff the following conditions are
+ // met:
+ //
+ // 1. The namespace of the aggregate and any other involved namespaces are unsharded.
+ // 2. Is allowed to be forwarded to shards.
+ // 3. Does not need to run on all shards.
+ // 4. Doesn't need transformation via DocumentSource::serialize().
if (!executionNsRoutingInfo.cm() &&
- !mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) &&
+ !PipelineS::mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline) &&
liteParsedPipeline.allowedToForwardFromMongos() &&
- liteParsedPipeline.allowedToPassthroughFromMongos()) {
+ liteParsedPipeline.allowedToPassthroughFromMongos() && !involvesShardedCollections) {
return aggPassthrough(opCtx,
namespaces,
executionNsRoutingInfo.primary()->getId(),
@@ -720,7 +288,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
} else {
// Unsharded collection. Get collection metadata from primary chunk.
auto collationObj = getDefaultCollationForUnshardedCollection(
- executionNsRoutingInfo.primary().get(), namespaces.executionNss);
+ executionNsRoutingInfo.primary().get(), executionNss);
if (!collationObj.isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(collationObj));
@@ -747,23 +315,19 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
<< " is not capable of producing input",
!pipeline->getSources().front()->constraints().requiresInputDocSource);
- auto cursorResponse = establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- cmdObj,
- liteParsedPipeline,
- std::move(pipeline),
- {});
+ auto cursorResponse = PipelineS::establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ cmdObj,
+ liteParsedPipeline,
+ std::move(pipeline),
+ {});
Command::filterCommandReplyForPassthrough(cursorResponse, result);
return getStatusFromCommandResult(result->asTempObj());
}
- auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx,
- namespaces.executionNss,
- cmdObj,
- request,
- liteParsedPipeline,
- std::move(pipeline)));
+ auto dispatchResults = uassertStatusOK(PipelineS::dispatchShardPipeline(
+ mergeCtx, executionNss, cmdObj, request, liteParsedPipeline, std::move(pipeline)));
if (mergeCtx->explain) {
// If we reach here, we've either succeeded in running the explain or exhausted all
@@ -808,13 +372,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
(!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) {
// Register the new mongoS cursor, and retrieve the initial batch of results.
auto cursorResponse =
- establishMergingMongosCursor(opCtx,
- request,
- namespaces.requestedNss,
- dispatchResults.commandForTargetedShards,
- liteParsedPipeline,
- std::move(mergingPipeline),
- std::move(dispatchResults.remoteCursors));
+ PipelineS::establishMergingMongosCursor(opCtx,
+ request,
+ namespaces.requestedNss,
+ dispatchResults.commandForTargetedShards,
+ liteParsedPipeline,
+ std::move(mergingPipeline),
+ std::move(dispatchResults.remoteCursors));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -829,7 +393,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto mergeResponse = uassertStatusOK(
establishMergingShardCursor(opCtx,
- namespaces.executionNss,
+ executionNss,
dispatchResults.remoteCursors,
mergeCmdObj,
boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge,
@@ -901,7 +465,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = Command::filterCommandRequestForPassthrough(
- createCommandForTargetedShards(aggRequest, cmdObj, nullptr));
+ PipelineS::createCommandForTargetedShards(aggRequest, cmdObj, nullptr));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 26f8dad4ec2..3aef12ca6c2 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -26,20 +26,38 @@
* then also delete it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
#include "mongo/platform/basic.h"
#include "mongo/s/commands/pipeline_s.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/query/collation/collation_spec.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/task_executor_pool.h"
-#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
+#include "mongo/s/query/cluster_client_cursor_impl.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/s/query/cluster_query_knobs.h"
+#include "mongo/s/query/document_source_router_adapter.h"
+#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
+#include "mongo/s/query/router_stage_internal_cursor.h"
+#include "mongo/s/query/router_stage_merge.h"
+#include "mongo/s/query/router_stage_update_on_add_shard.h"
+#include "mongo/s/query/store_possible_cursor.h"
+#include "mongo/s/stale_exception.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -48,6 +66,8 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
+MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors);
+
namespace {
/**
* Determines the single shard to which the given query will be targeted, and its associated
@@ -87,8 +107,500 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(
return swRoutingInfo;
}
+/**
+ * Given a document representing an aggregation command such as
+ *
+ * {aggregate: "myCollection", pipeline: [], ...},
+ *
+ * produces the corresponding explain command:
+ *
+ * {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
+ */
+Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
+ MutableDocument explainCommandBuilder;
+ explainCommandBuilder["explain"] = Value(aggregateCommand);
+ // Downstream host targeting code expects queryOptions at the top level of the command object.
+ explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
+ Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
+
+ // readConcern needs to be promoted to the top-level of the request.
+ explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
+ Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
+
+ // Add explain command options.
+ for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
+ explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
+ }
+
+ return explainCommandBuilder.freeze();
+}
+
+std::set<ShardId> getTargetedShards(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj shardQuery,
+ const BSONObj collation) {
+ if (PipelineS::mustRunOnAllShards(nss, routingInfo, litePipe)) {
+ // The pipeline begins with a stage which must be run on all shards.
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
+ return {shardIds.begin(), shardIds.end()};
+ }
+
+ if (routingInfo.cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation.
+ std::set<ShardId> shardIds;
+ routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds);
+ return shardIds;
+ }
+
+ // The collection is unsharded. Target only the primary shard for the database.
+ return {routingInfo.primaryId()};
+}
+
+StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
+ LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
+
+ std::set<ShardId> shardIds =
+ getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation);
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+
+ if (PipelineS::mustRunOnAllShards(nss, *routingInfo, litePipe)) {
+ // The pipeline contains a stage which must be run on all shards. Skip versioning and
+ // enqueue the raw command objects.
+ for (auto&& shardId : shardIds) {
+ requests.emplace_back(std::move(shardId), cmdObj);
+ }
+ } else if (routingInfo->cm()) {
+ // The collection is sharded. Use the routing table to decide which shards to target
+ // based on the query and collation, and build versioned requests for them.
+ for (auto& shardId : shardIds) {
+ auto versionedCmdObj =
+ appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
+ requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
+ }
+ } else {
+ // The collection is unsharded. Target only the primary shard for the database.
+ // Don't append shard version info when contacting the config servers.
+ requests.emplace_back(routingInfo->primaryId(),
+ !routingInfo->primary()->isConfig()
+ ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
+ : cmdObj);
+ }
+
+ if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
+ "until fail point is disabled.";
+ while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
+ sleepsecs(1);
+ }
+ }
+
+ // If we reach this point, we're either trying to establish cursors on a sharded execution
+ // namespace, or handling the case where a sharded collection was dropped and recreated as
+ // unsharded. Since views cannot be sharded, and because we will return an error rather than
+ // attempting to continue in the event that a recreated namespace is a view, we set
+ // viewDefinitionOut to nullptr.
+ BSONObj* viewDefinitionOut = nullptr;
+ auto swCursors = establishCursors(opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ readPref,
+ requests,
+ false /* do not allow partial results */,
+ viewDefinitionOut /* can't receive view definition */);
+
+ // If any shard returned a stale shardVersion error, invalidate the routing table cache.
+ // This will cause the cache to be refreshed the next time it is accessed.
+ if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
+ Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo));
+ }
+
+ return swCursors;
+}
+
} // namespace
+bool PipelineS::mustRunOnAllShards(const NamespaceString& nss,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const LiteParsedPipeline& litePipe) {
+ // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection
+ // must run on all shards.
+ const bool nsIsSharded = static_cast<bool>(routingInfo.cm());
+ return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream());
+}
+
+BSONObj PipelineS::createCommandForTargetedShards(
+ const AggregationRequest& request,
+ BSONObj originalCmdObj,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
+
+ // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
+ if (pipelineForTargetedShards) {
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(pipelineForTargetedShards->serialize());
+
+ if (pipelineForTargetedShards->isSplitForShards()) {
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+ }
+ }
+
+ // If this pipeline is not split, ensure that the write concern is propagated if present.
+ if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) {
+ targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
+ }
+
+ // If this is a request for an aggregation explain, then we must wrap the aggregate inside an
+ // explain command.
+ if (auto explainVerbosity = request.getExplain()) {
+ targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
+ }
+
+ return targetedCmd.freeze().toBson();
+}
+
+BSONObj PipelineS::establishMergingMongosCursor(
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
+
+ ClusterClientCursorParams params(
+ requestedNss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ ReadPreferenceSetting::get(opCtx));
+
+ params.tailableMode = pipelineForMerging->getContext()->tailableMode;
+ params.mergePipeline = std::move(pipelineForMerging);
+ params.remotes = std::move(cursors);
+
+ // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
+ // size we pass here is used for getMores, so do not specify a batch size if the initial request
+ // had a batch size of 0.
+ params.batchSize = request.getBatchSize() == 0
+ ? boost::none
+ : boost::optional<long long>(request.getBatchSize());
+
+ if (liteParsedPipeline.hasChangeStream()) {
+ // For change streams, we need to set up a custom stage to establish cursors on new shards
+ // when they are added.
+ params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
+ return stdx::make_unique<RouterStageUpdateOnAddShard>(
+ opCtx, executor, params, cmdToRunOnNewShards);
+ };
+ }
+ auto ccc = ClusterClientCursorImpl::make(
+ opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
+
+ auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
+ BSONObjBuilder cursorResponse;
+
+ CursorResponseBuilder responseBuilder(true, &cursorResponse);
+
+ for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
+ ClusterQueryResult next;
+ try {
+ next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
+ } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) {
+ // This exception is thrown when a $changeStream stage encounters an event
+ // that invalidates the cursor. We should close the cursor and return without
+ // error.
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
+ }
+
+ // Check whether we have exhausted the pipeline's results.
+ if (next.isEOF()) {
+ // We reached end-of-stream. If the cursor is not tailable, then we mark it as
+ // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
+ // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
+ // hope of returning data and thus we need to close the mongos cursor as well.
+ if (!ccc->isTailable() || ccc->remotesExhausted()) {
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ }
+ break;
+ }
+
+ // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
+ // to be returned on the next getMore.
+ auto nextObj = *next.getResult();
+
+ if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
+ ccc->queueResult(nextObj);
+ break;
+ }
+
+ responseBuilder.append(nextObj);
+ }
+
+ ccc->detachFromOperationContext();
+
+ CursorId clusterCursorId = 0;
+
+ if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
+ clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
+ opCtx,
+ ccc.releaseCursor(),
+ requestedNss,
+ ClusterCursorManager::CursorType::MultiTarget,
+ ClusterCursorManager::CursorLifetime::Mortal));
+ }
+
+ responseBuilder.done(clusterCursorId, requestedNss.ns());
+
+ Command::appendCommandStatus(cursorResponse, Status::OK());
+
+ return cursorResponse.obj();
+}
+
+/**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+StatusWith<PipelineS::DispatchShardPipelineResults> PipelineS::dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& executionNss,
+ BSONObj originalCmdObj,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ // The process is as follows:
+ // - First, determine whether we need to target more than one shard. If so, we split the
+ // pipeline; if not, we retain the existing pipeline.
+ // - Call establishShardCursors to dispatch the aggregation to the targeted shards.
+ // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
+ // the refreshed routing table data.
+ // - If the pipeline is not split and we now need to target multiple shards, split it. If the
+ // pipeline is already split and we now only need to target a single shard, reassemble the
+ // original pipeline.
+ // - After exhausting 10 attempts to establish the cursors, we give up and throw.
+ auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
+ auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>();
+ auto opCtx = expCtx->opCtx;
+
+ const bool needsPrimaryShardMerge =
+ (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
+
+ const bool needsMongosMerge = pipeline->needsMongosMerger();
+
+ const auto shardQuery = pipeline->getInitialQuery();
+
+ auto pipelineForTargetedShards = std::move(pipeline);
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
+ BSONObj targetedCommand;
+
+ int numAttempts = 0;
+
+ do {
+ // We need to grab a new routing table at the start of each iteration, since a stale config
+ // exception will invalidate the previous one.
+ auto executionNsRoutingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
+
+ // Determine whether we can run the entire aggregation on a single shard.
+ std::set<ShardId> shardIds = getTargetedShards(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ executionNsRoutingInfo,
+ shardQuery,
+ aggRequest.getCollation());
+
+ uassert(ErrorCodes::ShardNotFound,
+ "No targets were found for this aggregation. All shards were removed from the "
+ "cluster mid-operation",
+ shardIds.size() > 0);
+
+ // Don't need to split the pipeline if we are only targeting a single shard, unless:
+ // - There is a stage that needs to be run on the primary shard and the single target shard
+ // is not the primary.
+ // - The pipeline contains one or more stages which must always merge on mongoS.
+ const bool needsSplit =
+ (shardIds.size() > 1u || needsMongosMerge ||
+ (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId()));
+
+ const bool isSplit = pipelineForTargetedShards->isSplitForShards();
+
+ // If we have to run on multiple shards and the pipeline is not yet split, split it. If we
+ // can run on a single shard and the pipeline is already split, reassemble it.
+ if (needsSplit && !isSplit) {
+ pipelineForMerging = std::move(pipelineForTargetedShards);
+ pipelineForTargetedShards = pipelineForMerging->splitForSharded();
+ } else if (!needsSplit && isSplit) {
+ pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
+ }
+
+ // Generate the command object for the targeted shards.
+ targetedCommand = PipelineS::createCommandForTargetedShards(
+ aggRequest, originalCmdObj, pipelineForTargetedShards);
+
+ // Refresh the shard registry if we're targeting all shards. We need the shard registry
+ // to be at least as current as the logical time used when creating the command for
+ // $changeStream to work reliably, so we do a "hard" reload.
+ if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
+ auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
+ if (!shardRegistry->reload(opCtx)) {
+ shardRegistry->reload(opCtx);
+ }
+ }
+
+ // Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
+ if (expCtx->explain) {
+ if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
+ // Some stages (such as $currentOp) need to be broadcast to all shards, and should
+ // not participate in the shard version protocol.
+ swShardResults =
+ scatterGatherUnversionedTargetAllShards(opCtx,
+ executionNss.db().toString(),
+ executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ } else {
+ // Aggregations on a real namespace should use the routing table to target shards,
+ // and should participate in the shard version protocol.
+ swShardResults =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ executionNss.db().toString(),
+ executionNss,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ shardQuery,
+ aggRequest.getCollation(),
+ nullptr /* viewDefinition */);
+ }
+ } else {
+ swCursors = establishShardCursors(opCtx,
+ executionNss,
+ liteParsedPipeline,
+ &executionNsRoutingInfo,
+ targetedCommand,
+ ReadPreferenceSetting::get(opCtx),
+ shardQuery,
+ aggRequest.getCollation());
+
+ if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) {
+ LOG(1) << "got stale shardVersion error " << swCursors.getStatus()
+ << " while dispatching " << redact(targetedCommand) << " after "
+ << (numAttempts + 1) << " dispatch attempts";
+ }
+ }
+ } while (++numAttempts < kMaxNumStaleVersionRetries &&
+ (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK()));
+
+ if (!swShardResults.isOK()) {
+ return swShardResults.getStatus();
+ }
+ if (!swCursors.isOK()) {
+ return swCursors.getStatus();
+ }
+ return DispatchShardPipelineResults{needsPrimaryShardMerge,
+ std::move(swCursors.getValue()),
+ std::move(swShardResults.getValue()),
+ std::move(pipelineForTargetedShards),
+ std::move(pipelineForMerging),
+ targetedCommand};
+}
+
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineS::MongoSInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) {
+ // Explain is not supported for auxiliary lookups.
+ invariant(!expCtx->explain);
+
+ // Temporarily remove any deadline from this operation, we don't want to timeout while doing
+ // this lookup.
+ OperationContext::DeadlineStash deadlineStash(expCtx->opCtx);
+
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+ if (pipelineOptions.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+ if (pipelineOptions.attachCursorSource) {
+ pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release());
+ }
+
+ return pipeline;
+}
+
+StatusWith<unique_ptr<Pipeline, PipelineDeleter>>
+PipelineS::MongoSInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceRouterAdapter*>(pipeline->getSources().front().get()));
+
+ // Generate the command object for the targeted shards.
+ auto serialization = pipeline->serialize();
+ std::vector<BSONObj> rawStages;
+ rawStages.reserve(serialization.size());
+ std::transform(serialization.begin(),
+ serialization.end(),
+ std::back_inserter(rawStages),
+ [](const Value& stageObj) {
+ invariant(stageObj.getType() == BSONType::Object);
+ return stageObj.getDocument().toBson();
+ });
+ AggregationRequest aggRequest(expCtx->ns, rawStages);
+ LiteParsedPipeline liteParsedPipeline(aggRequest);
+ auto dispatchStatus = PipelineS::dispatchShardPipeline(
+ expCtx,
+ expCtx->ns,
+ aggRequest.serializeToCommandObj().toBson(),
+ aggRequest,
+ liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)));
+
+ if (!dispatchStatus.isOK()) {
+ return dispatchStatus.getStatus();
+ }
+ auto targetingResults = std::move(dispatchStatus.getValue());
+
+ auto params = stdx::make_unique<ClusterClientCursorParams>(
+ expCtx->ns,
+ AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(),
+ ReadPreferenceSetting::get(expCtx->opCtx));
+ params->remotes = std::move(targetingResults.remoteCursors);
+ params->mergePipeline = std::move(targetingResults.pipelineForMerging);
+
+ // We will transfer ownership of the params to the RouterStageInternalCursor, but need a
+ // reference to them to construct the RouterStageMerge.
+ auto* unownedParams = params.get();
+ auto root = ClusterClientCursorImpl::buildMergerPlan(
+ expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ unownedParams);
+ auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>(
+ expCtx->opCtx, std::move(params), std::move(root));
+
+ return Pipeline::create(
+ {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx);
+}
+
boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
@@ -192,4 +704,16 @@ std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors(
return cursorManager->getAllCursors();
}
+bool PipelineS::MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
+ const auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss);
+
+ if (!routingInfoStatus.isOK()) {
+ // db doesn't exist.
+ return false;
+ }
+ return static_cast<bool>(routingInfoStatus.getValue().cm());
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h
index cdf72158e31..968e28f36aa 100644
--- a/src/mongo/s/commands/pipeline_s.h
+++ b/src/mongo/s/commands/pipeline_s.h
@@ -28,8 +28,12 @@
#pragma once
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/mongo_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/query/cluster_client_cursor_params.h"
namespace mongo {
/**
@@ -55,9 +59,7 @@ public:
MONGO_UNREACHABLE;
}
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
- MONGO_UNREACHABLE;
- }
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
@@ -103,10 +105,8 @@ public:
MONGO_UNREACHABLE;
}
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- MONGO_UNREACHABLE;
- }
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final;
std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
CurrentOpConnectionsMode connMode,
@@ -128,9 +128,7 @@ public:
StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final {
- MONGO_UNREACHABLE;
- }
+ const MakePipelineOptions pipelineOptions) final;
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -143,6 +141,61 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
};
+ struct DispatchShardPipelineResults {
+ // True if this pipeline was split, and the second half of the pipeline needs to be run on
+ // the
+ // primary shard for the database.
+ bool needsPrimaryShardMerge;
+
+ // Populated if this *is not* an explain, this vector represents the cursors on the remote
+ // shards.
+ std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+
+ // Populated if this *is* an explain, this vector represents the results from each shard.
+ std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
+
+ // The half of the pipeline that was sent to each shard, or the entire pipeline if there was
+ // only one shard targeted.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
+
+ // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
+
+ // The command object to send to the targeted shards.
+ BSONObj commandForTargetedShards;
+ };
+
+ static BSONObj createCommandForTargetedShards(
+ const AggregationRequest&,
+ const BSONObj originalCmdObj,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards);
+
+ static BSONObj establishMergingMongosCursor(
+ OperationContext*,
+ const AggregationRequest&,
+ const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline&,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
+ std::vector<ClusterClientCursorParams::RemoteCursor>);
+
+ /**
+ * Targets shards for the pipeline and returns a struct with the remote cursors or results, and
+ * the pipeline that will need to be executed to merge the results from the remotes. If a stale
+ * shard version is encountered, refreshes the routing table and tries again.
+ */
+ static StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
+ const boost::intrusive_ptr<ExpressionContext>&,
+ const NamespaceString& executionNss,
+ const BSONObj originalCmdObj,
+ const AggregationRequest&,
+ const LiteParsedPipeline&,
+ std::unique_ptr<Pipeline, PipelineDeleter>);
+
+ static bool mustRunOnAllShards(const NamespaceString& nss,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const LiteParsedPipeline& litePipe);
+
private:
PipelineS() = delete; // Should never be instantiated.
};
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 67e334a7f47..e11174c062b 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -115,6 +115,13 @@ public:
boost::optional<ReadPreferenceSetting> getReadPreference() const final;
+ /**
+ * Constructs the pipeline of MergerPlanStages which will be used to answer the query.
+ */
+ static std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params);
+
public:
/** private for tests */
/**
@@ -133,13 +140,6 @@ public:
boost::optional<LogicalSessionId> lsid);
private:
- /**
- * Constructs the pipeline of MergerPlanStages which will be used to answer the query.
- */
- std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
-
ClusterClientCursorParams _params;
// Number of documents already returned by next().
diff --git a/src/mongo/s/query/router_stage_internal_cursor.h b/src/mongo/s/query/router_stage_internal_cursor.h
new file mode 100644
index 00000000000..95ca8831648
--- /dev/null
+++ b/src/mongo/s/query/router_stage_internal_cursor.h
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+
+/**
+ * This is a special type of RouterExecStage that is used to iterate remote cursors that were
+ * created internally and do not represent a client cursor, such as those used in a $lookup.
+ *
+ * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without
+ * creating a ClusterClientCursor, which would show up in the server stats for this mongos.
+ */
+class RouterStageInternalCursor final : public RouterExecStage {
+public:
+ RouterStageInternalCursor(OperationContext* opCtx,
+ std::unique_ptr<ClusterClientCursorParams>&& params,
+ std::unique_ptr<RouterExecStage> child)
+ : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {}
+
+ StatusWith<ClusterQueryResult> next(ExecContext execContext) {
+ return getChildStage()->next(execContext);
+ }
+
+private:
+ std::unique_ptr<ClusterClientCursorParams> _params;
+};
+} // namespace mongo