diff options
30 files changed, 2725 insertions, 2165 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml index 81bfe24a992..089e566dcdc 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_one_shard_sharded_collections.yml @@ -12,6 +12,8 @@ selector: - jstests/aggregation/bugs/lookup_unwind_killcursor.js # TODO: Remove when SERVER-23229 is fixed. - jstests/aggregation/bugs/groupMissing.js + # TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections. + - jstests/aggregation/sources/lookup/lookup_subpipeline.js exclude_with_any_tags: # Tests tagged with the following will fail because they assume collections are not sharded. - assumes_no_implicit_collection_creation_after_drop diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index d01c66bac17..5f1410e240e 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -23,6 +23,8 @@ selector: - jstests/aggregation/use_query_sort.js # TODO: Remove when SERVER-23229 is fixed. - jstests/aggregation/bugs/groupMissing.js + # TODO SERVER-32309: Enable once $lookup with pipeline supports sharded foreign collections. + - jstests/aggregation/sources/lookup/lookup_subpipeline.js exclude_with_any_tags: # Tests tagged with the following will fail because they assume collections are not sharded. - assumes_no_implicit_collection_creation_after_drop diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index be73c186383..3fa0acabb70 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -344,12 +344,16 @@ selector: - jstests/sharding/stale_mongos_updates_and_removes.js - jstests/sharding/geo_near_sharded.js # Enable when 4.2 becomes last-stable. + - jstests/sharding/collation_lookup.js - jstests/sharding/collation_targeting.js - jstests/sharding/collation_targeting_inherited.js - jstests/sharding/failcommand_failpoint_not_parallel.js - jstests/sharding/failcommand_ignores_internal.js - jstests/sharding/geo_near_random1.js - jstests/sharding/geo_near_random2.js + - jstests/sharding/lookup.js + - jstests/sharding/lookup_mongod_unaware.js + - jstests/sharding/lookup_stale_mongos.js - jstests/sharding/out_cannot_run_on_mongos.js - jstests/sharding/out_command_options.js - jstests/sharding/out_from_stale_mongos.js diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index 13ab1b2431e..df22312ce13 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -286,6 +286,45 @@ allowDiskUse: allowDiskUse, expectedCount: 400 }); + + // Test that $lookup is merged on the primary shard when the foreign collection is + // unsharded. + assertMergeOnMongoD({ + testName: "agg_mongos_merge_lookup_unsharded_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $lookup: { + from: unshardedColl.getName(), + localField: "_id", + foreignField: "_id", + as: "lookupField" + } + } + ], + mergeType: "primaryShard", + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); + + // Test that $lookup is merged on mongoS when the foreign collection is sharded. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_lookup_sharded_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $lookup: { + from: mongosColl.getName(), + localField: "_id", + foreignField: "_id", + as: "lookupField" + } + } + ], + mergeType: "mongos", + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); } /** diff --git a/jstests/aggregation/sources/facet/use_cases.js b/jstests/aggregation/sources/facet/use_cases.js index 1295f638910..6074e66d810 100644 --- a/jstests/aggregation/sources/facet/use_cases.js +++ b/jstests/aggregation/sources/facet/use_cases.js @@ -115,8 +115,6 @@ populateData(st.s0, nDocs); doExecutionTest(st.s0); - // Test that $facet stage propagates information about involved collections, preventing users - // from doing things like $lookup from a sharded collection. const shardedDBName = "sharded"; const shardedCollName = "collection"; const shardedColl = st.getDB(shardedDBName).getCollection(shardedCollName); @@ -126,21 +124,8 @@ assert.commandWorked( st.admin.runCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}})); - // Test that trying to perform a $lookup on a sharded collection returns an error. - let res = assert.commandFailed(unshardedColl.runCommand({ - aggregate: unshardedColl.getName(), - pipeline: [{ - $lookup: - {from: shardedCollName, localField: "_id", foreignField: "_id", as: "results"} - }], - cursor: {} - })); - assert.eq( - 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection"); - - // Test that trying to perform a $lookup on a sharded collection inside a $facet stage still - // returns an error. - res = assert.commandFailed(unshardedColl.runCommand({ + // Test $lookup inside a $facet stage on a sharded collection. + assert.commandWorked(unshardedColl.runCommand({ aggregate: unshardedColl.getName(), pipeline: [{ $facet: { @@ -156,8 +141,6 @@ }], cursor: {} })); - assert.eq( - 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection"); // Then run the assertions against a sharded collection. assert.commandWorked(st.admin.runCommand({enableSharding: dbName})); diff --git a/jstests/aggregation/sources/graphLookup/sharded.js b/jstests/aggregation/sources/graphLookup/sharded.js deleted file mode 100644 index b78649d5824..00000000000 --- a/jstests/aggregation/sources/graphLookup/sharded.js +++ /dev/null @@ -1,56 +0,0 @@ -// In SERVER-23725, $graphLookup was introduced. In this file, we test that the expression behaves -// correctly on a sharded collection. -// @tags: [requires_sharding] -load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - -(function() { - "use strict"; - - var st = new ShardingTest({name: "aggregation_graph_lookup", shards: 2, mongos: 1}); - - st.adminCommand({enableSharding: "graphLookup"}); - st.ensurePrimaryShard("graphLookup", st.shard1.shardName); - st.adminCommand({shardCollection: "graphLookup.local", key: {_id: 1}}); - - var foreign = st.getDB("graphLookup").foreign; - var local = st.getDB("graphLookup").local; - - var bulk = foreign.initializeUnorderedBulkOp(); - - for (var i = 0; i < 100; i++) { - bulk.insert({_id: i, next: i + 1}); - } - assert.writeOK(bulk.execute()); - - assert.writeOK(local.insert({})); - - var res = st.s.getDB("graphLookup") - .local - .aggregate({ - $graphLookup: { - from: "foreign", - startWith: {$literal: 0}, - connectToField: "_id", - connectFromField: "next", - as: "number_line" - } - }) - .toArray(); - - assert.eq(res.length, 1); - assert.eq(res[0].number_line.length, 100); - - // Cannot perform a $graphLookup where the "from" collection is sharded. - var pipeline = { - $graphLookup: { - from: "local", - startWith: {$literal: 0}, - connectToField: "_id", - connectFromField: "_id", - as: "out" - } - }; - - assertErrorCode(foreign, pipeline, 28769); - st.stop(); -}()); diff --git a/jstests/aggregation/sources/lookup/collation_lookup.js b/jstests/aggregation/sources/lookup/collation_lookup.js deleted file mode 100644 index 80d173138a6..00000000000 --- a/jstests/aggregation/sources/lookup/collation_lookup.js +++ /dev/null @@ -1,368 +0,0 @@ -// Cannot implicitly shard accessed collections because of collection existing when none expected. -// @tags: [assumes_no_implicit_collection_creation_after_drop] - -/** - * Tests that the $lookup stage respects the collation. - * - * The comparison of string values between the 'localField' and 'foreignField' should use the - * collation either explicitly set on the aggregation operation, or the collation inherited from the - * collection the "aggregate" command was performed on. - */ -(function() { - "use strict"; - - load("jstests/aggregation/extras/utils.js"); // for arrayEq - - const caseInsensitive = {collation: {locale: "en_US", strength: 2}}; - - const withDefaultCollationColl = db.collation_lookup_with_default; - const withoutDefaultCollationColl = db.collation_lookup_without_default; - - withDefaultCollationColl.drop(); - withoutDefaultCollationColl.drop(); - - assert.commandWorked(db.createCollection(withDefaultCollationColl.getName(), caseInsensitive)); - assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); - - assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); - assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"})); - assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"})); - - // Test that the $lookup stage respects the inherited collation. - let res = withDefaultCollationColl - .aggregate([{ - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }]) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; - assert( - arrayEq(expected, res[0].matched), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering"); - - res = withDefaultCollationColl - .aggregate([{ - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - } - ], - as: "matched1", - }, - }]) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - }, - { - "_id": "uppercase", - "str": "ABC", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - } - ]; - assert(arrayEq(expected, res[0].matched1), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + - " up to ordering. " + tojson(res)); - - // Test that the $lookup stage respects the inherited collation when it optimizes with an - // $unwind stage. - res = withDefaultCollationColl - .aggregate([ - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - {$unwind: "$matched"}, - ]) - .toArray(); - assert.eq(2, res.length, tojson(res)); - - expected = [ - {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, - {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - res = withDefaultCollationColl - .aggregate([ - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - }, - {$unwind: "$matched2"}, - ], - as: "matched1", - }, - }, - {$unwind: "$matched1"}, - ]) - .toArray(); - assert.eq(4, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}} - } - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - // Test that the $lookup stage respects an explicit collation on the aggregation operation. - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - ], - caseInsensitive) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; - assert( - arrayEq(expected, res[0].matched), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering"); - - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - } - ], - as: "matched1", - }, - } - ], - caseInsensitive) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - }, - { - "_id": "uppercase", - "str": "ABC", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - } - ]; - assert(arrayEq(expected, res[0].matched1), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + - " up to ordering"); - - // Test that the $lookup stage respects an explicit collation on the aggregation operation when - // it optimizes with an $unwind stage. - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - {$unwind: "$matched"}, - ], - caseInsensitive) - .toArray(); - assert.eq(2, res.length, tojson(res)); - - expected = [ - {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, - {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - }, - {$unwind: "$matched2"}, - ], - as: "matched1", - }, - }, - {$unwind: "$matched1"}, - ], - caseInsensitive) - .toArray(); - assert.eq(4, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}} - } - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the - // collection or the aggregation operation. - res = withoutDefaultCollationColl - .aggregate([ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - ]) - .toArray(); - assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res); - - res = withoutDefaultCollationColl - .aggregate([ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - }, - {$unwind: "$matched2"}, - ], - as: "matched1", - }, - }, - ]) - .toArray(); - assert.eq([{ - "_id": "lowercase", - "str": "abc", - "matched1": [{ - "_id": "lowercase", - "str": "abc", - "matched2": {"_id": "lowercase", "str": "abc"} - }] - }], - res); -})(); diff --git a/jstests/aggregation/sources/lookup/lookup.js b/jstests/aggregation/sources/lookup/lookup.js deleted file mode 100644 index 0d29f4eb8a8..00000000000 --- a/jstests/aggregation/sources/lookup/lookup.js +++ /dev/null @@ -1,1148 +0,0 @@ -// Basic $lookup regression tests. -// @tags: [requires_sharding] - -load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - -(function() { - "use strict"; - - // Used by testPipeline to sort result documents. All _ids must be primitives. - function compareId(a, b) { - if (a._id < b._id) { - return -1; - } - if (a._id > b._id) { - return 1; - } - return 0; - } - - function generateNestedPipeline(foreignCollName, numLevels) { - let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}]; - - for (let level = 1; level < numLevels; level++) { - pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}]; - } - - return pipeline; - } - - // Helper for testing that pipeline returns correct set of results. - function testPipeline(pipeline, expectedResult, collection) { - assert.eq(collection.aggregate(pipeline).toArray().sort(compareId), - expectedResult.sort(compareId)); - } - - function runTest(coll, from, thirdColl, fourthColl) { - var db = null; // Using the db variable is banned in this function. - - assert.writeOK(coll.insert({_id: 0, a: 1})); - assert.writeOK(coll.insert({_id: 1, a: null})); - assert.writeOK(coll.insert({_id: 2})); - - assert.writeOK(from.insert({_id: 0, b: 1})); - assert.writeOK(from.insert({_id: 1, b: null})); - assert.writeOK(from.insert({_id: 2})); - - // - // Basic functionality. - // - - // "from" document added to "as" field if a == b, where nonexistent fields are treated as - // null. - var expectedResults = [ - {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}], - expectedResults, - coll); - - // If localField is nonexistent, it is treated as if it is null. - expectedResults = [ - {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}], - expectedResults, - coll); - - // If foreignField is nonexistent, it is treated as if it is null. - expectedResults = [ - {_id: 0, a: 1, "same": []}, - {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}], - expectedResults, - coll); - - // If there are no matches or the from coll doesn't exist, the result is an empty array. - expectedResults = - [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}]; - testPipeline( - [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}], - expectedResults, - coll); - testPipeline( - [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}], - expectedResults, - coll); - - // If field name specified by "as" already exists, it is overwritten. - expectedResults = [ - {_id: 0, "a": [{_id: 0, b: 1}]}, - {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}], - expectedResults, - coll); - - // Running multiple $lookups in the same pipeline is allowed. - expectedResults = [ - {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]}, - { - _id: 1, - a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}] - }, - {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}}, - {$project: {"a": 1, "c": 1}}, - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}} - ], - expectedResults, - coll); - - // - // Coalescing with $unwind. - // - - // A normal $unwind with on the "as" field. - expectedResults = [ - {_id: 0, a: 1, same: {_id: 0, b: 1}}, - {_id: 1, a: null, same: {_id: 1, b: null}}, - {_id: 1, a: null, same: {_id: 2}}, - {_id: 2, same: {_id: 1, b: null}}, - {_id: 2, same: {_id: 2}} - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, - {$unwind: {path: "$same"}} - ], - expectedResults, - coll); - - // An $unwind on the "as" field, with includeArrayIndex. - expectedResults = [ - {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)}, - {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)}, - {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)}, - {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)}, - {_id: 2, same: {_id: 2}, index: NumberLong(1)}, - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, - {$unwind: {path: "$same", includeArrayIndex: "index"}} - ], - expectedResults, - coll); - - // Normal $unwind with no matching documents. - expectedResults = []; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, - {$unwind: {path: "$same"}} - ], - expectedResults, - coll); - - // $unwind with preserveNullAndEmptyArray with no matching documents. - expectedResults = [ - {_id: 0, a: 1}, - {_id: 1, a: null}, - {_id: 2}, - ]; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, - {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} - ], - expectedResults, - coll); - - // $unwind with preserveNullAndEmptyArray, some with matching documents, some without. - expectedResults = [ - {_id: 0, a: 1}, - {_id: 1, a: null, same: {_id: 0, b: 1}}, - {_id: 2}, - ]; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, - {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} - ], - expectedResults, - coll); - - // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching - // documents, some without. - expectedResults = [ - {_id: 0, a: 1, index: null}, - {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)}, - {_id: 2, index: null}, - ]; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, - { - $unwind: - {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"} - } - ], - expectedResults, - coll); - - // - // Dependencies. - // - - // If $lookup didn't add "localField" to its dependencies, this test would fail as the - // value of the "a" field would be lost and treated as null. - expectedResults = [ - {_id: 0, "same": [{_id: 0, b: 1}]}, - {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, - {$project: {"same": 1}} - ], - expectedResults, - coll); - - // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test - // would fail as the value of the "a" field would be lost and treated as null. - expectedResults = [ - {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]}, - { - "_id": 1, - "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}] - }, - {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]} - ]; - testPipeline( - [ - { - $lookup: { - let : {var1: "$a"}, - pipeline: [{$project: {x: "$$var1"}}], - from: "from", - as: "same" - } - }, - {$project: {"same": 1}} - ], - expectedResults, - coll); - - // - // Dotted field paths. - // - - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: 1})); - assert.writeOK(coll.insert({_id: 1, a: null})); - assert.writeOK(coll.insert({_id: 2})); - assert.writeOK(coll.insert({_id: 3, a: {c: 1}})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, b: 1})); - assert.writeOK(from.insert({_id: 1, b: null})); - assert.writeOK(from.insert({_id: 2})); - assert.writeOK(from.insert({_id: 3, b: {c: 1}})); - assert.writeOK(from.insert({_id: 4, b: {c: 2}})); - - // Once without a dotted field. - var pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}]; - expectedResults = [ - {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Look up a dotted field. - pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}]; - // All but the last document in 'coll' have a nullish value for 'a.c'. - expectedResults = [ - {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // With an $unwind stage. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: {b: 1}})); - assert.writeOK(coll.insert({_id: 1})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, target: 1})); - - pipeline = [ - { - $lookup: { - localField: "a.b", - foreignField: "target", - from: "from", - as: "same.documents", - } - }, - { - // Expected input to $unwind: - // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}} - // {_id: 1, same: {documents: []}} - $unwind: { - path: "$same.documents", - preserveNullAndEmptyArrays: true, - includeArrayIndex: "c.d.e", - } - } - ]; - expectedResults = [ - {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}}, - {_id: 1, same: {}, c: {d: {e: null}}}, - ]; - testPipeline(pipeline, expectedResults, coll); - - // - // Query-like local fields (SERVER-21287) - // - - // This must only do an equality match rather than treating the value as a regex. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: /a regex/})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, b: /a regex/})); - assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "b", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // A local value of an array. - // - - // Basic array corresponding to multiple documents. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0})); - assert.writeOK(from.insert({_id: 1})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "_id", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}]; - testPipeline(pipeline, expectedResults, coll); - - // Basic array corresponding to a single document. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [1]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0})); - assert.writeOK(from.insert({_id: 1})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "_id", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}]; - testPipeline(pipeline, expectedResults, coll); - - // Array containing regular expressions. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]})); - assert.writeOK(coll.insert({_id: 1, a: [/^x/]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, b: "should not match a regex"})); - assert.writeOK(from.insert({_id: 1, b: "xxxx"})); - assert.writeOK(from.insert({_id: 2, b: /a regex/})); - assert.writeOK(from.insert({_id: 3, b: /^x/})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "b", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [ - {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]}, - {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // 'localField' references a field within an array of sub-objects. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0})); - assert.writeOK(from.insert({_id: 1})); - assert.writeOK(from.insert({_id: 2})); - assert.writeOK(from.insert({_id: 3})); - - pipeline = [ - { - $lookup: { - localField: "a.b", - foreignField: "_id", - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // Pipeline syntax using 'let' variables. - // - coll.drop(); - assert.writeOK(coll.insert({_id: 1, x: 1})); - assert.writeOK(coll.insert({_id: 2, x: 2})); - assert.writeOK(coll.insert({_id: 3, x: 3})); - - from.drop(); - assert.writeOK(from.insert({_id: 1})); - assert.writeOK(from.insert({_id: 2})); - assert.writeOK(from.insert({_id: 3})); - - // Basic non-equi theta join via $project. - pipeline = [ - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}}, - {$match: {isMatch: true}}, - {$project: {isMatch: 0}} - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, x: 1, "c": []}, - {"_id": 2, x: 2, "c": [{"_id": 1}]}, - { - "_id": 3, - x: 3, - "c": [ - {"_id": 1}, - { - "_id": 2, - } - ] - } - ]; - testPipeline(pipeline, expectedResults, coll); - - // Basic non-equi theta join via $match. - pipeline = [ - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - {$match: {$expr: {$lt: ["$_id", "$$var1"]}}}, - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, x: 1, "c": []}, - {"_id": 2, x: 2, "c": [{"_id": 1}]}, - { - "_id": 3, - x: 3, - "c": [ - {"_id": 1}, - { - "_id": 2, - } - ] - } - ]; - testPipeline(pipeline, expectedResults, coll); - - // Multi-level join using $match. - pipeline = [ - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - {$match: {$expr: {$eq: ["$_id", "$$var1"]}}}, - { - $lookup: { - let : {var2: "$_id"}, - pipeline: [ - {$match: {$expr: {$gt: ["$_id", "$$var2"]}}}, - ], - from: "from", - as: "d" - } - }, - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]}, - {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]}, - {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Equijoin with $match that can't be delegated to the query subsystem. - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - {$addFields: {newField: 2}}, - {$match: {$expr: {$eq: ["$newField", "$$var1"]}}}, - {$project: {newField: 0}} - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, "x": 1, "c": []}, - {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, - {"_id": 3, "x": 3, "c": []} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Multiple variables. - pipeline = [ - { - $lookup: { - let : {var1: "$_id", var2: "$x"}, - pipeline: [ - { - $project: { - isMatch: {$gt: ["$$var1", "$_id"]}, - var2Times2: {$multiply: [2, "$$var2"]} - } - }, - {$match: {isMatch: true}}, - {$project: {isMatch: 0}} - ], - from: "from", - as: "c", - }, - }, - {$project: {x: 1, c: 1}} - ]; - - expectedResults = [ - {"_id": 1, x: 1, "c": []}, - {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]}, - {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Let var as complex expression object. - pipeline = [ - { - $lookup: { - let : {var1: {$mod: ["$x", 3]}}, - pipeline: [ - {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}}, - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - { - "_id": 1, - x: 1, - "c": [ - {_id: 1, var1Mod3TimesForeignId: 1}, - {_id: 2, var1Mod3TimesForeignId: 2}, - {_id: 3, var1Mod3TimesForeignId: 3} - ] - }, - { - "_id": 2, - x: 2, - "c": [ - {_id: 1, var1Mod3TimesForeignId: 2}, - {_id: 2, var1Mod3TimesForeignId: 4}, - {_id: 3, var1Mod3TimesForeignId: 6} - ] - }, - { - "_id": 3, - x: 3, - "c": [ - {_id: 1, var1Mod3TimesForeignId: 0}, - {_id: 2, var1Mod3TimesForeignId: 0}, - {_id: 3, var1Mod3TimesForeignId: 0} - ] - } - ]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' defined variables are available to all nested sub-pipelines. - pipeline = [ - {$match: {_id: 1}}, - { - $lookup: { - let : {var1: "ABC", var2: "123"}, - pipeline: [ - {$match: {_id: 1}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 2}}, - {$addFields: {letVar1: "$$var1"}}, - { - $lookup: { - let : {var3: "XYZ"}, - pipeline: [{ - $addFields: { - mergedLetVars: - {$concat: ["$$var1", "$$var2", "$$var3"]} - } - }], - from: "from", - as: "join3" - } - }, - ], - from: "from", - as: "join2" - } - }, - ], - from: "from", - as: "join1", - } - } - ]; - - expectedResults = [{ - "_id": 1, - "x": 1, - "join1": [{ - "_id": 1, - "join2": [{ - "_id": 2, - "letVar1": "ABC", - "join3": [ - {"_id": 1, "mergedLetVars": "ABC123XYZ"}, - {"_id": 2, "mergedLetVars": "ABC123XYZ"}, - {"_id": 3, "mergedLetVars": "ABC123XYZ"} - ] - }] - }] - }]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' variable shadowed by foreign pipeline variable. - pipeline = [ - {$match: {_id: 2}}, - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - { - $project: { - shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}}, - originalVar: "$$var1" - } - }, - { - $lookup: { - pipeline: [{ - $project: { - shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}}, - originalVar: "$$var1" - } - }], - from: "from", - as: "d" - } - } - ], - from: "from", - as: "c", - } - } - ]; - - expectedResults = [{ - "_id": 2, - "x": 2, - "c": [ - { - "_id": 1, - "shadowedVar": "abc", - "originalVar": 2, - "d": [ - {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} - ] - }, - { - "_id": 2, - "shadowedVar": "abc", - "originalVar": 2, - "d": [ - {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} - ] - }, - { - "_id": 3, - "shadowedVar": "abc", - "originalVar": 2, - "d": [ - {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} - ] - } - ] - }]; - testPipeline(pipeline, expectedResults, coll); - - // Use of undefined variable fails. - assertErrorCode(coll, - [{ - $lookup: { - from: "from", - as: "as", - let : {var1: "$x"}, - pipeline: [{$project: {myVar: "$$nonExistent"}}] - } - }], - 17276); - - // The dotted path offset of a non-object variable is equivalent referencing an undefined - // field. - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - { - $match: { - $expr: { - $eq: [ - "FIELD-IS-NULL", - {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]} - ] - } - } - }, - ], - from: "from", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [ - {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, - {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, - {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Comparison where a 'let' variable references an array. - coll.drop(); - assert.writeOK(coll.insert({x: [1, 2, 3]})); - - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}}, - ], - from: "from", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // Pipeline syntax with nested object. - // - coll.drop(); - assert.writeOK(coll.insert({x: {y: {z: 10}}})); - - // Subfields of 'let' variables can be referenced via dotted path. - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - {$project: {z: "$$var1.y.z"}}, - ], - from: "from", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{ - "x": {"y": {"z": 10}}, - "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}] - }]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' variable with dotted field path off of $$ROOT. - pipeline = [ - { - $lookup: { - let : {var1: "$$ROOT.x.y.z"}, - pipeline: - [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}], - from: "lookUp", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' variable with dotted field path off of $$CURRENT. - pipeline = [ - { - $lookup: { - let : {var1: "$$CURRENT.x.y.z"}, - pipeline: [ - {$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}}, - {$project: {_id: 0}} - ], - from: "lookUp", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // Pipeline syntax with nested $lookup. - // - coll.drop(); - assert.writeOK(coll.insert({_id: 1, w: 1})); - assert.writeOK(coll.insert({_id: 2, w: 2})); - assert.writeOK(coll.insert({_id: 3, w: 3})); - - from.drop(); - assert.writeOK(from.insert({_id: 1, x: 1})); - assert.writeOK(from.insert({_id: 2, x: 2})); - assert.writeOK(from.insert({_id: 3, x: 3})); - - thirdColl.drop(); - assert.writeOK(thirdColl.insert({_id: 1, y: 1})); - assert.writeOK(thirdColl.insert({_id: 2, y: 2})); - assert.writeOK(thirdColl.insert({_id: 3, y: 3})); - - fourthColl.drop(); - assert.writeOK(fourthColl.insert({_id: 1, z: 1})); - assert.writeOK(fourthColl.insert({_id: 2, z: 2})); - assert.writeOK(fourthColl.insert({_id: 3, z: 3})); - - // Nested $lookup pipeline. - pipeline = [ - {$match: {_id: 1}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 2}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 3}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 1}}, - ], - from: "fourthColl", - as: "thirdLookup" - } - }, - ], - from: "thirdColl", - as: "secondLookup" - } - }, - ], - from: "from", - as: "firstLookup", - } - } - ]; - - expectedResults = [{ - "_id": 1, - "w": 1, - "firstLookup": [{ - "_id": 2, - x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}] - }] - }]; - testPipeline(pipeline, expectedResults, coll); - - // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested - // $lookup sub-pipelines up to the maximum depth, but not beyond. - let nestedPipeline = generateNestedPipeline("lookup", 20); - assert.commandWorked(coll.getDB().runCommand( - {aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}})); - - nestedPipeline = generateNestedPipeline("lookup", 21); - assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded); - - // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose - // combined nesting depth exceeds the limit. - nestedPipeline = generateNestedPipeline("lookup", 10); - coll.getDB().view1.drop(); - assert.commandWorked( - coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline})); - - nestedPipeline = generateNestedPipeline("view1", 10); - coll.getDB().view2.drop(); - assert.commandWorked( - coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline})); - - // Confirm that a composite sub-pipeline depth of 20 is allowed. - assert.commandWorked( - coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}})); - - const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1); - coll.getDB().view3.drop(); - assert.commandWorked(coll.getDB().runCommand( - {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit})); - - // Confirm that a composite sub-pipeline depth greater than 20 fails. - assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded); - - // - // Error cases. - // - - // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with - // localField/foreignField syntax. - assertErrorCode(coll, - [{$lookup: {foreignField: "b", from: "from", as: "same"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", from: "from", as: "same"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", as: "same"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], - ErrorCodes.FailedToParse); - - // localField/foreignField and pipeline/let syntax must not be mixed. - assertErrorCode(coll, - [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode( - coll, - [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode( - coll, - [{ - $lookup: - {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"} - }], - ErrorCodes.FailedToParse); - - // 'from', 'as', 'localField' and 'foreignField' must all be of type string. - assertErrorCode(coll, - [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}], - ErrorCodes.FailedToParse); - - // 'pipeline' and 'let' must be of expected type. - assertErrorCode( - coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); - assertErrorCode( - coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); - assertErrorCode(coll, - [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - - // The foreign collection must be a valid namespace. - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}], - ErrorCodes.InvalidNamespace); - // $lookup's field must be an object. - assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse); - } - - // Run tests on single node. - db.lookUp.drop(); - db.from.drop(); - db.thirdColl.drop(); - db.fourthColl.drop(); - runTest(db.lookUp, db.from, db.thirdColl, db.fourthColl); - - // Run tests in a sharded environment. - var sharded = new ShardingTest({shards: 2, mongos: 1}); - assert(sharded.adminCommand({enableSharding: "test"})); - sharded.getDB('test').lookUp.drop(); - sharded.getDB('test').from.drop(); - sharded.getDB('test').thirdColl.drop(); - sharded.getDB('test').fourthColl.drop(); - assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}})); - runTest(sharded.getDB('test').lookUp, - sharded.getDB('test').from, - sharded.getDB('test').thirdColl, - sharded.getDB('test').fourthColl); - - // An error is thrown if the from collection is sharded. - assert(sharded.adminCommand({shardCollection: "test.from", key: {_id: 1}})); - assertErrorCode(sharded.getDB('test').lookUp, - [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}], - 28769); - - // An error is thrown if nested $lookup from collection is sharded. - assert(sharded.adminCommand({shardCollection: "test.fourthColl", key: {_id: 1}})); - assertErrorCode(sharded.getDB('test').lookUp, - [{ - $lookup: { - pipeline: [{$lookup: {pipeline: [], from: "fourthColl", as: "same"}}], - from: "thirdColl", - as: "same" - } - }], - 28769); - - // Test that a $lookup from an unsharded collection followed by an $out to a sharded collection - // is allowed. - const sourceColl = sharded.getDB("test").lookUp; - sourceColl.drop(); - assert(sharded.adminCommand({shardCollection: sourceColl.getFullName(), key: {_id: "hashed"}})); - assert.commandWorked(sourceColl.insert({_id: 0, a: 0})); - - const outColl = sharded.getDB("test").out; - outColl.drop(); - assert(sharded.adminCommand({shardCollection: outColl.getFullName(), key: {_id: "hashed"}})); - - const fromColl = sharded.getDB("test").from; - fromColl.drop(); - assert.commandWorked(fromColl.insert({_id: 0, b: 0})); - - sourceColl.aggregate([ - {$lookup: {localField: "a", foreignField: "b", from: fromColl.getName(), as: "same"}}, - {$out: {to: outColl.getName(), mode: "insertDocuments"}} - ]); - - assert.eq([{a: 0, same: [{_id: 0, b: 0}]}], outColl.find({}, {_id: 0}).toArray()); - - sharded.stop(); -}()); diff --git a/jstests/aggregation/sources/lookup/lookup_subpipeline.js b/jstests/aggregation/sources/lookup/lookup_subpipeline.js new file mode 100644 index 00000000000..abffadf4c0b --- /dev/null +++ b/jstests/aggregation/sources/lookup/lookup_subpipeline.js @@ -0,0 +1,604 @@ +// Tests for the $lookup stage with a sub-pipeline. +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const testName = "lookup_subpipeline"; + + const coll = db.lookUp; + const from = db.from; + const thirdColl = db.thirdColl; + const fourthColl = db.fourthColl; + + // Used by testPipeline to sort result documents. All _ids must be primitives. + function compareId(a, b) { + if (a._id < b._id) { + return -1; + } + if (a._id > b._id) { + return 1; + } + return 0; + } + + function generateNestedPipeline(foreignCollName, numLevels) { + let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}]; + + for (let level = 1; level < numLevels; level++) { + pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}]; + } + + return pipeline; + } + + // Helper for testing that pipeline returns correct set of results. + function testPipeline(pipeline, expectedResult, collection) { + assert.eq(collection.aggregate(pipeline).toArray().sort(compareId), + expectedResult.sort(compareId)); + } + + // + // Pipeline syntax using 'let' variables. + // + coll.drop(); + assert.writeOK(coll.insert({_id: 1, x: 1})); + assert.writeOK(coll.insert({_id: 2, x: 2})); + assert.writeOK(coll.insert({_id: 3, x: 3})); + + from.drop(); + assert.writeOK(from.insert({_id: 1})); + assert.writeOK(from.insert({_id: 2})); + assert.writeOK(from.insert({_id: 3})); + + // Basic non-equi theta join via $project. + let pipeline = [ + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}}, + {$match: {isMatch: true}}, + {$project: {isMatch: 0}} + ], + from: "from", + as: "c", + } + }, + ]; + + let expectedResults = [ + {"_id": 1, x: 1, "c": []}, + {"_id": 2, x: 2, "c": [{"_id": 1}]}, + { + "_id": 3, + x: 3, + "c": [ + {"_id": 1}, + { + "_id": 2, + } + ] + } + ]; + testPipeline(pipeline, expectedResults, coll); + // Basic non-equi theta join via $match. + pipeline = [ + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + {$match: {$expr: {$lt: ["$_id", "$$var1"]}}}, + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + {"_id": 1, x: 1, "c": []}, + {"_id": 2, x: 2, "c": [{"_id": 1}]}, + { + "_id": 3, + x: 3, + "c": [ + {"_id": 1}, + { + "_id": 2, + } + ] + } + ]; + testPipeline(pipeline, expectedResults, coll); + + // Multi-level join using $match. + pipeline = [ + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + {$match: {$expr: {$eq: ["$_id", "$$var1"]}}}, + { + $lookup: { + let : {var2: "$_id"}, + pipeline: [ + {$match: {$expr: {$gt: ["$_id", "$$var2"]}}}, + ], + from: "from", + as: "d" + } + }, + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]}, + {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]}, + {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Equijoin with $match that can't be delegated to the query subsystem. + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + {$addFields: {newField: 2}}, + {$match: {$expr: {$eq: ["$newField", "$$var1"]}}}, + {$project: {newField: 0}} + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + {"_id": 1, "x": 1, "c": []}, + {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, + {"_id": 3, "x": 3, "c": []} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Multiple variables. + pipeline = [ + { + $lookup: { + let : {var1: "$_id", var2: "$x"}, + pipeline: [ + { + $project: { + isMatch: {$gt: ["$$var1", "$_id"]}, + var2Times2: {$multiply: [2, "$$var2"]} + } + }, + {$match: {isMatch: true}}, + {$project: {isMatch: 0}} + ], + from: "from", + as: "c", + }, + }, + {$project: {x: 1, c: 1}} + ]; + + expectedResults = [ + {"_id": 1, x: 1, "c": []}, + {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]}, + {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Let var as complex expression object. + pipeline = [ + { + $lookup: { + let : {var1: {$mod: ["$x", 3]}}, + pipeline: [ + {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}}, + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + { + "_id": 1, + x: 1, + "c": [ + {_id: 1, var1Mod3TimesForeignId: 1}, + {_id: 2, var1Mod3TimesForeignId: 2}, + {_id: 3, var1Mod3TimesForeignId: 3} + ] + }, + { + "_id": 2, + x: 2, + "c": [ + {_id: 1, var1Mod3TimesForeignId: 2}, + {_id: 2, var1Mod3TimesForeignId: 4}, + {_id: 3, var1Mod3TimesForeignId: 6} + ] + }, + { + "_id": 3, + x: 3, + "c": [ + {_id: 1, var1Mod3TimesForeignId: 0}, + {_id: 2, var1Mod3TimesForeignId: 0}, + {_id: 3, var1Mod3TimesForeignId: 0} + ] + } + ]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' defined variables are available to all nested sub-pipelines. + pipeline = [ + {$match: {_id: 1}}, + { + $lookup: { + let : {var1: "ABC", var2: "123"}, + pipeline: [ + {$match: {_id: 1}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 2}}, + {$addFields: {letVar1: "$$var1"}}, + { + $lookup: { + let : {var3: "XYZ"}, + pipeline: [{ + $addFields: { + mergedLetVars: + {$concat: ["$$var1", "$$var2", "$$var3"]} + } + }], + from: "from", + as: "join3" + } + }, + ], + from: "from", + as: "join2" + } + }, + ], + from: "from", + as: "join1", + } + } + ]; + + expectedResults = [{ + "_id": 1, + "x": 1, + "join1": [{ + "_id": 1, + "join2": [{ + "_id": 2, + "letVar1": "ABC", + "join3": [ + {"_id": 1, "mergedLetVars": "ABC123XYZ"}, + {"_id": 2, "mergedLetVars": "ABC123XYZ"}, + {"_id": 3, "mergedLetVars": "ABC123XYZ"} + ] + }] + }] + }]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' variable shadowed by foreign pipeline variable. + pipeline = [ + {$match: {_id: 2}}, + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + { + $project: { + shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}}, + originalVar: "$$var1" + } + }, + { + $lookup: { + pipeline: [{ + $project: { + shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}}, + originalVar: "$$var1" + } + }], + from: "from", + as: "d" + } + } + ], + from: "from", + as: "c", + } + } + ]; + + expectedResults = [{ + "_id": 2, + "x": 2, + "c": [ + { + "_id": 1, + "shadowedVar": "abc", + "originalVar": 2, + "d": [ + {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} + ] + }, + { + "_id": 2, + "shadowedVar": "abc", + "originalVar": 2, + "d": [ + {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} + ] + }, + { + "_id": 3, + "shadowedVar": "abc", + "originalVar": 2, + "d": [ + {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} + ] + } + ] + }]; + testPipeline(pipeline, expectedResults, coll); + + // Use of undefined variable fails. + assertErrorCode(coll, + [{ + $lookup: { + from: "from", + as: "as", + let : {var1: "$x"}, + pipeline: [{$project: {myVar: "$$nonExistent"}}] + } + }], + 17276); + + // The dotted path offset of a non-object variable is equivalent referencing an undefined + // field. + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + { + $match: { + $expr: + {$eq: ["FIELD-IS-NULL", {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]}]} + } + }, + ], + from: "from", + as: "as", + } + }, + {$project: {_id: 0}}, + {$sort: {x: 1}} + ]; + + expectedResults = [ + {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, + {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, + {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Comparison where a 'let' variable references an array. + coll.drop(); + assert.writeOK(coll.insert({x: [1, 2, 3]})); + + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}}, + ], + from: "from", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // Pipeline syntax with nested object. + // + coll.drop(); + assert.writeOK(coll.insert({x: {y: {z: 10}}})); + + // Subfields of 'let' variables can be referenced via dotted path. + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + {$project: {z: "$$var1.y.z"}}, + ], + from: "from", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{ + "x": {"y": {"z": 10}}, + "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}] + }]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' variable with dotted field path off of $$ROOT. + pipeline = [ + { + $lookup: { + let : {var1: "$$ROOT.x.y.z"}, + pipeline: + [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}], + from: "lookUp", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' variable with dotted field path off of $$CURRENT. + pipeline = [ + { + $lookup: { + let : {var1: "$$CURRENT.x.y.z"}, + pipeline: + [{$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}}, {$project: {_id: 0}}], + from: "lookUp", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // Pipeline syntax with nested $lookup. + // + coll.drop(); + assert.writeOK(coll.insert({_id: 1, w: 1})); + assert.writeOK(coll.insert({_id: 2, w: 2})); + assert.writeOK(coll.insert({_id: 3, w: 3})); + + from.drop(); + assert.writeOK(from.insert({_id: 1, x: 1})); + assert.writeOK(from.insert({_id: 2, x: 2})); + assert.writeOK(from.insert({_id: 3, x: 3})); + + thirdColl.drop(); + assert.writeOK(thirdColl.insert({_id: 1, y: 1})); + assert.writeOK(thirdColl.insert({_id: 2, y: 2})); + assert.writeOK(thirdColl.insert({_id: 3, y: 3})); + + fourthColl.drop(); + assert.writeOK(fourthColl.insert({_id: 1, z: 1})); + assert.writeOK(fourthColl.insert({_id: 2, z: 2})); + assert.writeOK(fourthColl.insert({_id: 3, z: 3})); + + // Nested $lookup pipeline. + pipeline = [ + {$match: {_id: 1}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 2}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 3}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 1}}, + ], + from: "fourthColl", + as: "thirdLookup" + } + }, + ], + from: "thirdColl", + as: "secondLookup" + } + }, + ], + from: "from", + as: "firstLookup", + } + } + ]; + + expectedResults = [{ + "_id": 1, + "w": 1, + "firstLookup": [{ + "_id": 2, + x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}] + }] + }]; + testPipeline(pipeline, expectedResults, coll); + + // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested + // $lookup sub-pipelines up to the maximum depth, but not beyond. + let nestedPipeline = generateNestedPipeline("lookup", 20); + assert.commandWorked( + coll.getDB().runCommand({aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}})); + + nestedPipeline = generateNestedPipeline("lookup", 21); + assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded); + + // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose + // combined nesting depth exceeds the limit. + nestedPipeline = generateNestedPipeline("lookup", 10); + coll.getDB().view1.drop(); + assert.commandWorked( + coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline})); + + nestedPipeline = generateNestedPipeline("view1", 10); + coll.getDB().view2.drop(); + assert.commandWorked( + coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline})); + + // Confirm that a composite sub-pipeline depth of 20 is allowed. + assert.commandWorked(coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}})); + + const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1); + coll.getDB().view3.drop(); + assert.commandWorked(coll.getDB().runCommand( + {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit})); + + // + // Error cases. + // + + // Confirm that a composite sub-pipeline depth greater than 20 fails. + assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded); + + // 'pipeline' and 'let' must be of expected type. + assertErrorCode( + coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); + assertErrorCode( + coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); + assertErrorCode(coll, + [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}], + ErrorCodes.FailedToParse); +}()); diff --git a/jstests/sharding/collation_lookup.js b/jstests/sharding/collation_lookup.js new file mode 100644 index 00000000000..f06e92ab3fc --- /dev/null +++ b/jstests/sharding/collation_lookup.js @@ -0,0 +1,454 @@ +/** + * Tests that the $lookup stage respects the collation when the local and/or foreign collections + * are sharded. + * + * The comparison of string values between the 'localField' and 'foreignField' should use the + * collation either explicitly set on the aggregation operation, or the collation inherited from the + * collection the "aggregate" command was performed on. + */ +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // for arrayEq + + function runTests(withDefaultCollationColl, withoutDefaultCollationColl, collation) { + // Test that the $lookup stage respects the inherited collation. + let res = withDefaultCollationColl + .aggregate([{ + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }]) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; + assert(arrayEq(expected, res[0].matched), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + + " up to ordering"); + + res = withDefaultCollationColl + .aggregate([{ + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + } + ], + as: "matched1", + }, + }]) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + }, + { + "_id": "uppercase", + "str": "ABC", + "matched2": + [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + } + ]; + assert(arrayEq(expected, res[0].matched1), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + + " up to ordering. " + tojson(res)); + + // Test that the $lookup stage respects the inherited collation when it optimizes with an + // $unwind stage. + res = withDefaultCollationColl + .aggregate([ + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + {$unwind: "$matched"}, + ]) + .toArray(); + assert.eq(2, res.length, tojson(res)); + + expected = [ + {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, + {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + res = withDefaultCollationColl + .aggregate([ + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + }, + {$unwind: "$matched2"}, + ], + as: "matched1", + }, + }, + {$unwind: "$matched1"}, + ]) + .toArray(); + assert.eq(4, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + } + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + // Test that the $lookup stage respects an explicit collation on the aggregation operation. + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + ], + collation) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; + assert(arrayEq(expected, res[0].matched), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + + " up to ordering"); + + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + } + ], + as: "matched1", + }, + } + ], + collation) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + }, + { + "_id": "uppercase", + "str": "ABC", + "matched2": + [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + } + ]; + assert(arrayEq(expected, res[0].matched1), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + + " up to ordering"); + + // Test that the $lookup stage respects an explicit collation on the aggregation operation + // when + // it optimizes with an $unwind stage. + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + {$unwind: "$matched"}, + ], + collation) + .toArray(); + assert.eq(2, res.length, tojson(res)); + + expected = [ + {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, + {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + }, + {$unwind: "$matched2"}, + ], + as: "matched1", + }, + }, + {$unwind: "$matched1"}, + ], + collation) + .toArray(); + assert.eq(4, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + } + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the + // collection or the aggregation operation. + res = withoutDefaultCollationColl + .aggregate([ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + ]) + .toArray(); + assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res); + + res = withoutDefaultCollationColl + .aggregate([ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + }, + {$unwind: "$matched2"}, + ], + as: "matched1", + }, + }, + ]) + .toArray(); + assert.eq([{ + "_id": "lowercase", + "str": "abc", + "matched1": [{ + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "lowercase", "str": "abc"} + }] + }], + res); + } + + const st = new ShardingTest({shards: 2, config: 1}); + const testName = "collation_lookup"; + const caseInsensitive = {collation: {locale: "en_US", strength: 2}}; + + const mongosDB = st.s0.getDB(testName); + const withDefaultCollationColl = mongosDB[testName + "_with_default"]; + const withoutDefaultCollationColl = mongosDB[testName + "_without_default"]; + + assert.commandWorked( + mongosDB.createCollection(withDefaultCollationColl.getName(), caseInsensitive)); + assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); + + assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); + assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"})); + assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"})); + + // + // Sharded collection with default collation and unsharded collection without a default + // collation. + // + assert.commandWorked( + withDefaultCollationColl.createIndex({str: 1}, {collation: {locale: "simple"}})); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + // Shard the collection with a default collation. + assert.commandWorked(mongosDB.adminCommand({ + shardCollection: withDefaultCollationColl.getFullName(), + key: {str: 1}, + collation: {locale: "simple"} + })); + + // Split the collection into 2 chunks. + assert.commandWorked(mongosDB.adminCommand( + {split: withDefaultCollationColl.getFullName(), middle: {str: "abc"}})); + + // Move the chunk containing {str: "abc"} to shard0001. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: withDefaultCollationColl.getFullName(), + find: {str: "abc"}, + to: st.shard1.shardName + })); + + runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive); + + // TODO: Enable the following tests once SERVER-32536 is fixed. + // + // Sharded collection with default collation and sharded collection without a default + // collation. + // + + // Shard the collection without a default collation. + // assert.commandWorked(mongosDB.adminCommand({ + // shardCollection: withoutDefaultCollationColl.getFullName(), + // key: {_id: 1}, + // })); + + // // Split the collection into 2 chunks. + // assert.commandWorked(mongosDB.adminCommand( + // {split: withoutDefaultCollationColl.getFullName(), middle: {_id: "unmatched"}})); + + // // Move the chunk containing {_id: "lowercase"} to shard0001. + // assert.commandWorked(mongosDB.adminCommand({ + // moveChunk: withoutDefaultCollationColl.getFullName(), + // find: {_id: "lowercase"}, + // to: st.shard1.shardName + // })); + + // runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive); + + st.stop(); +})(); diff --git a/jstests/sharding/lookup.js b/jstests/sharding/lookup.js new file mode 100644 index 00000000000..7bb1bca0be5 --- /dev/null +++ b/jstests/sharding/lookup.js @@ -0,0 +1,618 @@ +// Basic $lookup regression tests. + +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + load("jstests/libs/fixture_helpers.js"); // For isSharded. + + const st = new ShardingTest({shards: 2, config: 1, mongos: 1}); + const testName = "lookup_sharded"; + + const mongosDB = st.s0.getDB(testName); + assert.commandWorked(mongosDB.dropDatabase()); + + // Used by testPipeline to sort result documents. All _ids must be primitives. + function compareId(a, b) { + if (a._id < b._id) { + return -1; + } + if (a._id > b._id) { + return 1; + } + return 0; + } + + // Helper for testing that pipeline returns correct set of results. + function testPipeline(pipeline, expectedResult, collection) { + assert.eq(collection.aggregate(pipeline).toArray().sort(compareId), + expectedResult.sort(compareId)); + } + + function runTest(coll, from, thirdColl, fourthColl) { + let db = null; // Using the db variable is banned in this function. + + assert.commandWorked(coll.remove({})); + assert.commandWorked(from.remove({})); + assert.commandWorked(thirdColl.remove({})); + assert.commandWorked(fourthColl.remove({})); + + assert.writeOK(coll.insert({_id: 0, a: 1})); + assert.writeOK(coll.insert({_id: 1, a: null})); + assert.writeOK(coll.insert({_id: 2})); + + assert.writeOK(from.insert({_id: 0, b: 1})); + assert.writeOK(from.insert({_id: 1, b: null})); + assert.writeOK(from.insert({_id: 2})); + + // + // Basic functionality. + // + + // "from" document added to "as" field if a == b, where nonexistent fields are treated as + // null. + let expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}], + expectedResults, + coll); + + // If localField is nonexistent, it is treated as if it is null. + expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}], + expectedResults, + coll); + + // If foreignField is nonexistent, it is treated as if it is null. + expectedResults = [ + {_id: 0, a: 1, "same": []}, + {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}], + expectedResults, + coll); + + // If there are no matches or the from coll doesn't exist, the result is an empty array. + expectedResults = + [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}]; + testPipeline( + [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}], + expectedResults, + coll); + testPipeline( + [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}], + expectedResults, + coll); + + // If field name specified by "as" already exists, it is overwritten. + expectedResults = [ + {_id: 0, "a": [{_id: 0, b: 1}]}, + {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}], + expectedResults, + coll); + + // Running multiple $lookups in the same pipeline is allowed. + expectedResults = [ + {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]}, + { + _id: 1, + a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}] + }, + {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}}, + {$project: {"a": 1, "c": 1}}, + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}} + ], + expectedResults, + coll); + + // + // Coalescing with $unwind. + // + + // A normal $unwind with on the "as" field. + expectedResults = [ + {_id: 0, a: 1, same: {_id: 0, b: 1}}, + {_id: 1, a: null, same: {_id: 1, b: null}}, + {_id: 1, a: null, same: {_id: 2}}, + {_id: 2, same: {_id: 1, b: null}}, + {_id: 2, same: {_id: 2}} + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, + {$unwind: {path: "$same"}} + ], + expectedResults, + coll); + + // An $unwind on the "as" field, with includeArrayIndex. + expectedResults = [ + {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)}, + {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)}, + {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)}, + {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)}, + {_id: 2, same: {_id: 2}, index: NumberLong(1)}, + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, + {$unwind: {path: "$same", includeArrayIndex: "index"}} + ], + expectedResults, + coll); + + // Normal $unwind with no matching documents. + expectedResults = []; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, + {$unwind: {path: "$same"}} + ], + expectedResults, + coll); + + // $unwind with preserveNullAndEmptyArray with no matching documents. + expectedResults = [ + {_id: 0, a: 1}, + {_id: 1, a: null}, + {_id: 2}, + ]; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, + {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} + ], + expectedResults, + coll); + + // $unwind with preserveNullAndEmptyArray, some with matching documents, some without. + expectedResults = [ + {_id: 0, a: 1}, + {_id: 1, a: null, same: {_id: 0, b: 1}}, + {_id: 2}, + ]; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, + {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} + ], + expectedResults, + coll); + + // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching + // documents, some without. + expectedResults = [ + {_id: 0, a: 1, index: null}, + {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)}, + {_id: 2, index: null}, + ]; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, + { + $unwind: + {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"} + } + ], + expectedResults, + coll); + + // + // Dependencies. + // + + // If $lookup didn't add "localField" to its dependencies, this test would fail as the + // value of the "a" field would be lost and treated as null. + expectedResults = [ + {_id: 0, "same": [{_id: 0, b: 1}]}, + {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, + {$project: {"same": 1}} + ], + expectedResults, + coll); + + // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test + // would fail as the value of the "a" field would be lost and treated as null. + expectedResults = [ + {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]}, + { + "_id": 1, + "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}] + }, + {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]} + ]; + testPipeline( + [ + { + $lookup: { + let : {var1: "$a"}, + pipeline: [{$project: {x: "$$var1"}}], + from: "from", + as: "same" + } + }, + {$project: {"same": 1}} + ], + expectedResults, + coll); + + // + // Dotted field paths. + // + + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: 1})); + assert.writeOK(coll.insert({_id: 1, a: null})); + assert.writeOK(coll.insert({_id: 2})); + assert.writeOK(coll.insert({_id: 3, a: {c: 1}})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0, b: 1})); + assert.writeOK(from.insert({_id: 1, b: null})); + assert.writeOK(from.insert({_id: 2})); + assert.writeOK(from.insert({_id: 3, b: {c: 1}})); + assert.writeOK(from.insert({_id: 4, b: {c: 2}})); + + // Once without a dotted field. + let pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}]; + expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Look up a dotted field. + pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}]; + // All but the last document in 'coll' have a nullish value for 'a.c'. + expectedResults = [ + {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // With an $unwind stage. + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: {b: 1}})); + assert.writeOK(coll.insert({_id: 1})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0, target: 1})); + + pipeline = [ + { + $lookup: { + localField: "a.b", + foreignField: "target", + from: "from", + as: "same.documents", + } + }, + { + // Expected input to $unwind: + // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}} + // {_id: 1, same: {documents: []}} + $unwind: { + path: "$same.documents", + preserveNullAndEmptyArrays: true, + includeArrayIndex: "c.d.e", + } + } + ]; + expectedResults = [ + {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}}, + {_id: 1, same: {}, c: {d: {e: null}}}, + ]; + testPipeline(pipeline, expectedResults, coll); + + // + // Query-like local fields (SERVER-21287) + // + + // This must only do an equality match rather than treating the value as a regex. + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: /a regex/})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0, b: /a regex/})); + assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "b", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // A local value of an array. + // + + // Basic array corresponding to multiple documents. + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0})); + assert.writeOK(from.insert({_id: 1})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "_id", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}]; + testPipeline(pipeline, expectedResults, coll); + + // Basic array corresponding to a single document. + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: [1]})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0})); + assert.writeOK(from.insert({_id: 1})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "_id", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}]; + testPipeline(pipeline, expectedResults, coll); + + // Array containing regular expressions. + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]})); + assert.writeOK(coll.insert({_id: 1, a: [/^x/]})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0, b: "should not match a regex"})); + assert.writeOK(from.insert({_id: 1, b: "xxxx"})); + assert.writeOK(from.insert({_id: 2, b: /a regex/})); + assert.writeOK(from.insert({_id: 3, b: /^x/})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "b", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [ + {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]}, + {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // 'localField' references a field within an array of sub-objects. + assert.commandWorked(coll.remove({})); + assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]})); + + assert.commandWorked(from.remove({})); + assert.writeOK(from.insert({_id: 0})); + assert.writeOK(from.insert({_id: 1})); + assert.writeOK(from.insert({_id: 2})); + assert.writeOK(from.insert({_id: 3})); + + pipeline = [ + { + $lookup: { + localField: "a.b", + foreignField: "_id", + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // Test $lookup when the foreign collection is a view. + // + // TODO SERVER-32548: Allow this test to run when the foreign collection is sharded. + if (!FixtureHelpers.isSharded(from)) { + assert.commandWorked( + coll.getDB().runCommand({create: "fromView", viewOn: "from", pipeline: []})); + pipeline = [ + { + $lookup: { + localField: "a.b", + foreignField: "_id", + from: "fromView", + as: "c", + } + }, + ]; + + expectedResults = + [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}]; + testPipeline(pipeline, expectedResults, coll); + } + + // + // Error cases. + // + + // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with + // localField/foreignField syntax. + assertErrorCode(coll, + [{$lookup: {foreignField: "b", from: "from", as: "same"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", from: "from", as: "same"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", as: "same"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], + ErrorCodes.FailedToParse); + + // localField/foreignField and pipeline/let syntax must not be mixed. + assertErrorCode(coll, + [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode( + coll, + [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode( + coll, + [{ + $lookup: + {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"} + }], + ErrorCodes.FailedToParse); + + // 'from', 'as', 'localField' and 'foreignField' must all be of type string. + assertErrorCode(coll, + [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}], + ErrorCodes.FailedToParse); + + // The foreign collection must be a valid namespace. + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}], + ErrorCodes.InvalidNamespace); + // $lookup's field must be an object. + assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse); + } + + // + // Test unsharded local collection and unsharded foreign collection. + // + mongosDB.lookUp.drop(); + mongosDB.from.drop(); + mongosDB.thirdColl.drop(); + mongosDB.fourthColl.drop(); + + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // Verify that the command is sent only to the primary shard when both the local and foreign + // collections are unsharded. + assert(!assert + .commandWorked(mongosDB.lookup.explain().aggregate([{ + $lookup: { + from: mongosDB.from.getName(), + localField: "a", + foreignField: "b", + as: "results" + } + }])) + .hasOwnProperty("shards")); + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + // + // Test unsharded local collection and sharded foreign collection. + // + + // Shard the foreign collection on _id. + st.shardColl(mongosDB.from, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // + // Test sharded local collection and unsharded foreign collection. + // + assert(mongosDB.from.drop()); + + // Shard the local collection on _id. + st.shardColl(mongosDB.lookup, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // + // Test sharded local and foreign collections. + // + + // Shard the foreign collection on _id. + st.shardColl(mongosDB.from, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // Test that a $lookup from an unsharded collection followed by an $out to a sharded collection + // is allowed. + const sourceColl = st.getDB(testName).lookUp; + assert(sourceColl.drop()); + assert(st.adminCommand({shardCollection: sourceColl.getFullName(), key: {_id: "hashed"}})); + assert.commandWorked(sourceColl.insert({_id: 0, a: 0})); + + const outColl = st.getDB(testName).out; + assert(outColl.drop()); + assert(st.adminCommand({shardCollection: outColl.getFullName(), key: {_id: "hashed"}})); + + const fromColl = st.getDB(testName).from; + assert(fromColl.drop()); + assert.commandWorked(fromColl.insert({_id: 0, b: 0})); + + sourceColl.aggregate([ + {$lookup: {localField: "a", foreignField: "b", from: fromColl.getName(), as: "same"}}, + {$out: {to: outColl.getName(), mode: "insertDocuments"}} + ]); + + assert.eq([{a: 0, same: [{_id: 0, b: 0}]}], outColl.find({}, {_id: 0}).toArray()); + + st.stop(); +}()); diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js new file mode 100644 index 00000000000..0c6072f8095 --- /dev/null +++ b/jstests/sharding/lookup_mongod_unaware.js @@ -0,0 +1,168 @@ +// Tests the behavior of a $lookup when a shard contains incorrect routing information for the +// local and/or foreign collections. This includes when the shard thinks the collection is sharded +// when it's not, and likewise when it thinks the collection is unsharded but is actually sharded. +// +// We restart a mongod to cause it to forget that a collection was sharded. When restarted, we +// expect it to still have all the previous data. +// @tags: [requires_persistence] +(function() { + "use strict"; + + // Restarts the primary shard and ensures that it believes both collections are unsharded. + function restartPrimaryShard(rs, localColl, foreignColl) { + // Returns true if the shard is aware that the collection is sharded. + function hasRoutingInfoForNs(shardConn, coll) { + const res = shardConn.adminCommand({getShardVersion: coll, fullMetadata: true}); + assert.commandWorked(res); + return res.metadata.collVersion != undefined; + } + + rs.restart(0); + rs.awaitSecondaryNodes(); + assert(!hasRoutingInfoForNs(rs.getPrimary(), localColl.getFullName())); + assert(!hasRoutingInfoForNs(rs.getPrimary(), foreignColl.getFullName())); + } + + const testName = "lookup_stale_mongod"; + const st = new ShardingTest({ + shards: 2, + mongos: 2, + rs: {nodes: 1}, + }); + + const mongos0DB = st.s0.getDB(testName); + const mongos0LocalColl = mongos0DB[testName + "_local"]; + const mongos0ForeignColl = mongos0DB[testName + "_foreign"]; + + const mongos1DB = st.s1.getDB(testName); + const mongos1LocalColl = mongos1DB[testName + "_local"]; + const mongos1ForeignColl = mongos1DB[testName + "_foreign"]; + + const pipeline = [ + { + $lookup: + {localField: "a", foreignField: "b", from: mongos0ForeignColl.getName(), as: "same"} + }, + {$sort: {_id: 1}} + ]; + + // The results are expected to be correct if the $lookup stage is executed on the mongos which + // is aware that the collection is sharded. + const expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + + // Ensure that shard0 is the primary shard. + assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()})); + st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName); + + assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1})); + assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null})); + + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + + // Send writes through mongos1 such that it's aware of the collections and believes they are + // unsharded. + assert.writeOK(mongos1LocalColl.insert({_id: 2})); + assert.writeOK(mongos1ForeignColl.insert({_id: 2})); + + // + // Test unsharded local and sharded foreign collections, with the primary shard unaware that + // the foreign collection is sharded. + // + + // Shard the foreign collection. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0ForeignColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Verify $lookup results through the fresh mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // Verify $lookup results through mongos1, which is not aware that the local + // collection is sharded. The results are expected to be incorrect when both the mongos and + // primary shard incorrectly believe that a collection is unsharded. + // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file + // sent to the stale mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ + {_id: 0, a: 1, "same": []}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]); + + // + // Test sharded local and sharded foreign collections, with the primary shard unaware that + // either collection is sharded. + // + + // Shard the local collection. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0LocalColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Verify $lookup results through the fresh mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // Verify $lookup results through mongos1, which is not aware that the local + // collection is sharded. The results are expected to be incorrect when both the mongos and + // primary shard incorrectly believe that a collection is unsharded. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]); + + // + // Test sharded local and unsharded foreign collections, with the primary shard unaware that + // the local collection is sharded. + // + + // Recreate the foreign collection as unsharded. + mongos0ForeignColl.drop(); + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + assert.writeOK(mongos0ForeignColl.insert({_id: 2})); + + // Verify $lookup results through the fresh mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // Verify $lookup results through mongos1, which is not aware that the local + // collection is sharded. The results are expected to be incorrect when both the mongos and + // primary shard incorrectly believe that a collection is unsharded. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]); + + st.stop(); +})(); diff --git a/jstests/sharding/lookup_stale_mongos.js b/jstests/sharding/lookup_stale_mongos.js new file mode 100644 index 00000000000..0dc17958e26 --- /dev/null +++ b/jstests/sharding/lookup_stale_mongos.js @@ -0,0 +1,130 @@ +// Tests the behavior of a $lookup when the mongos contains stale routing information for the +// local and/or foreign collections. This includes when mongos thinks the collection is sharded +// when it's not, and likewise when mongos thinks the collection is unsharded but is actually +// sharded. +(function() { + "use strict"; + + const testName = "lookup_stale_mongos"; + const st = new ShardingTest({ + shards: 2, + mongos: 2, + }); + + const mongos0DB = st.s0.getDB(testName); + assert.commandWorked(mongos0DB.dropDatabase()); + const mongos0LocalColl = mongos0DB[testName + "_local"]; + const mongos0ForeignColl = mongos0DB[testName + "_foreign"]; + + const mongos1DB = st.s1.getDB(testName); + const mongos1LocalColl = mongos1DB[testName + "_local"]; + const mongos1ForeignColl = mongos1DB[testName + "_foreign"]; + + const pipeline = [ + { + $lookup: + {localField: "a", foreignField: "b", from: mongos1ForeignColl.getName(), as: "same"} + }, + {$sort: {_id: 1}} + ]; + const expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + + // Ensure that shard0 is the primary shard. + assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()})); + st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName); + + assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1})); + assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null})); + + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + + // Send writes through mongos1 such that it's aware of the collections and believes they are + // unsharded. + assert.writeOK(mongos1LocalColl.insert({_id: 2})); + assert.writeOK(mongos1ForeignColl.insert({_id: 2})); + + // + // Test unsharded local and sharded foreign collections, with mongos unaware that the foreign + // collection is sharded. + // + + // Shard the foreign collection through mongos0. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0ForeignColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Issue a $lookup through mongos1, which is unaware that the foreign collection is sharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // + // Test sharded local and sharded foreign collections, with mongos unaware that the local + // collection is sharded. + // + + // Shard the local collection through mongos0. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0LocalColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Issue a $lookup through mongos1, which is unaware that the local collection is sharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // + // Test sharded local and unsharded foreign collections, with mongos unaware that the foreign + // collection is unsharded. + // + + // Recreate the foreign collection as unsharded through mongos0. + mongos0ForeignColl.drop(); + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + assert.writeOK(mongos0ForeignColl.insert({_id: 2})); + + // Issue a $lookup through mongos1, which is unaware that the foreign collection is now + // unsharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // + // Test unsharded local and foreign collections, with mongos unaware that the local + // collection is unsharded. + // + + // Recreate the local collection as unsharded through mongos0. + mongos0LocalColl.drop(); + assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1})); + assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null})); + assert.writeOK(mongos0LocalColl.insert({_id: 2})); + + // Issue a $lookup through mongos1, which is unaware that the local collection is now + // unsharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + st.stop(); +})(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 2f1c2e27cca..71270f0e72f 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -320,7 +320,10 @@ env.Library( 'mongos_process_interface.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', + '$BUILD_DIR/mongo/s/query/cluster_query', 'mongo_process_common', ] ) diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp index 17622131061..737c316c9bf 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -246,8 +246,7 @@ DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() { // with the resume token. auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace); auto matchSpec = BSON("$match" << BSONObj()); - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx)); + auto pipeline = pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx); if (auto first = pipeline->getNext()) { auto firstOplogEntry = Value(*first); uassert(40576, diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index a7b0567b66b..a94179c1431 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -458,34 +458,31 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index d93535dc030..1b6583bb655 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -207,8 +207,8 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { // We've already allocated space for the trailing $match stage in '_fromPipeline'. _fromPipeline.back() = *matchStage; - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx)); + auto pipeline = + pExpCtx->mongoProcessInterface->makePipeline(_fromPipeline, _fromExpCtx); while (auto next = pipeline->getNext()) { uassert(40271, str::stream() diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 6309f89597a..47f37939f73 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -63,30 +63,27 @@ public: MockMongoInterface(std::deque<DocumentSource::GetNextResult> results) : _results(std::move(results)) {} - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_results)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 04094a1d45b..7223f854521 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -196,9 +196,16 @@ StageConstraints DocumentSourceLookUp::constraints(Pipeline::SplitState) const { txnRequirement = resolvedRequirements.second; } + // If executing on mongos and the foreign collection is sharded, then this stage can run on + // mongos. + HostTypeRequirement hostRequirement = + (pExpCtx->inMongos && pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs)) + ? HostTypeRequirement::kMongoS + : HostTypeRequirement::kPrimaryShard; + StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - HostTypeRequirement::kPrimaryShard, + hostRequirement, diskRequirement, FacetRequirement::kAllowed, txnRequirement); @@ -289,8 +296,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( // If we don't have a cache, build and return the pipeline immediately. if (!_cache || _cache->isAbandoned()) { - return uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx)); + return pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx); } // Tailor the pipeline construction for our needs. We want a non-optimized pipeline without a @@ -300,8 +306,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( pipelineOpts.attachCursorSource = false; // Construct the basic pipeline without a cache stage. - auto pipeline = uassertStatusOK( - pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts)); + auto pipeline = + pExpCtx->mongoProcessInterface->makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); // Add the cache stage at the end and optimize. During the optimization process, the cache will // either move itself to the correct position in the pipeline, or will abandon itself if no @@ -313,8 +319,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - _fromExpCtx, pipeline.get())); + pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(_fromExpCtx, + pipeline.release()); } // If the cache has been abandoned, release it. diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index d17b1a73c9c..2a2b3338789 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -81,13 +81,6 @@ public: return requiredPrivileges; } - /** - * Lookup from a sharded collection is not allowed. - */ - bool allowShardedForeignCollection(NamespaceString nss) const final { - return (_foreignNssSet.find(nss) == _foreignNssSet.end()); - } - private: const NamespaceString _fromNss; const stdx::unordered_set<NamespaceString> _foreignNssSet; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 148197d4d5b..9c3dbf5777b 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -85,34 +85,31 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } boost::optional<Document> lookupSingleDocument( @@ -125,11 +122,12 @@ public: // case of a change stream on a whole database so we need to make a copy of the // ExpressionContext with the new namespace. auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none); - auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); - if (swPipeline == ErrorCodes::NamespaceNotFound) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; + try { + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); + } catch (ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { return boost::none; } - auto pipeline = uassertStatusOK(std::move(swPipeline)); auto lookedUpDocument = pipeline->getNext(); if (auto next = pipeline->getNext()) { diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index 4f461d6705c..bacfc5c6cda 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -544,28 +544,25 @@ public: return false; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) final { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { if (pipeline->popFrontWithName("$match") || pipeline->popFrontWithName("$sort") || pipeline->popFrontWithName("$project")) { @@ -575,7 +572,7 @@ public: } pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 5f092107c1a..be934e88707 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -184,20 +184,24 @@ public: * - If opts.attachCursorSource is false, the pipeline will be returned without attempting to * add an initial cursor source. * - * This function returns a non-OK status if parsing the pipeline failed. + * This function throws if parsing the pipeline failed. */ - virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + virtual std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) = 0; /** - * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This - * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be - * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be + * Accepts a pipeline and returns a new one which will draw input from the underlying + * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be + * thrown if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be * the only case where NamespaceNotFound is returned. + * + * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr. + * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the + * compiler expects to find an implementation of PipelineDeleter. */ - virtual Status attachCursorSourceToPipeline( + virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 85f141c9277..6336910029c 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -28,10 +28,13 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + #include "mongo/platform/basic.h" #include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/curop.h" #include "mongo/db/index/index_descriptor.h" @@ -45,17 +48,109 @@ #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_merge_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/transaction_router.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/log.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); + using boost::intrusive_ptr; using std::shared_ptr; using std::string; using std::unique_ptr; namespace { +// Given a document representing an aggregation command such as +// +// {aggregate: "myCollection", pipeline: [], ...}, +// +// produces the corresponding explain command: +// +// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { + MutableDocument explainCommandBuilder; + explainCommandBuilder["explain"] = Value(aggregateCommand); + // Downstream host targeting code expects queryOptions at the top level of the command object. + explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = + Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); + + // readConcern needs to be promoted to the top-level of the request. + explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = + Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); + + // Add explain command options. + for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { + explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); + } + + return explainCommandBuilder.freeze(); +} + +std::vector<RemoteCursor> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj& cmdObj, + const AggregationRequest& request, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + const bool mustRunOnAll = MongoSInterface::mustRunOnAllShards(nss, litePipe); + std::set<ShardId> shardIds = MongoSInterface::getTargetedShards( + opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); + std::vector<std::pair<ShardId, BSONObj>> requests; + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo || mustRunOnAll); + + if (mustRunOnAll) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo->db().primaryId(), + !routingInfo->db().primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj); + } + + if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + sleepsecs(1); + } + } + + return establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + MongoSInterface::getDesiredRetryPolicy(request)); +} + /** * Determines the single shard to which the given query will be targeted, and its associated * shardVersion. Throws if the query targets more than one shard. @@ -115,6 +210,379 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace +Shard::RetryPolicy MongoSInterface::getDesiredRetryPolicy(const AggregationRequest& req) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (req.getWriteConcern()) { + return Shard::RetryPolicy::kNotIdempotent; + } + return Shard::RetryPolicy::kIdempotent; +} + +BSONObj MongoSInterface::createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + if (pipeline) { + targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + } + + return MongoSInterface::genericTransformForShards( + std::move(targetedCmd), opCtx, shardId, request, collationObj); +} + +BSONObj MongoSInterface::genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj) { + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an + // explain command. + if (auto explainVerbosity = request.getExplain()) { + cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); + } + + if (!collationObj.isEmpty()) { + cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); + } + + if (opCtx->getTxnNumber()) { + invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), + str::stream() << "Command for shards unexpectedly had the " + << OperationSessionInfo::kTxnNumberFieldName + << " field set: " + << cmdForShards.peek().toString()); + cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = + Value(static_cast<long long>(*opCtx->getTxnNumber())); + } + + auto aggCmd = cmdForShards.freeze().toBson(); + + if (shardId) { + if (auto txnRouter = TransactionRouter::get(opCtx)) { + aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); + } + } + + // agg creates temp collection and should handle implicit create separately. + return appendAllowImplicitCreate(aggCmd, true); +} + +BSONObj MongoSInterface::createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge) { + + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it + // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may + // have detected a logged in user and appended that user name to the $listSessions spec to + // send to the shards. + targetedCmd[AggregationRequest::kPipelineName] = + Value(splitPipeline.shardsPipeline->serialize()); + + // When running on many shards with the exchange we may not need merging. + if (needsMerge) { + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); + } + + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + + targetedCmd[AggregationRequest::kExchangeName] = + exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); + + return genericTransformForShards( + std::move(targetedCmd), opCtx, boost::none, request, collationObj); +} + +std::set<ShardId> MongoSInterface::getTargetedShards( + OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (mustRunOnAllShards) { + // The pipeline begins with a stage which must be run on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + return {shardIds.begin(), shardIds.end()}; + } + + // If we don't need to run on all shards, then we should always have a valid routing table. + invariant(routingInfo); + + return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); +} + +bool MongoSInterface::mustRunOnAllShards(const NamespaceString& nss, + const LiteParsedPipeline& litePipe) { + // The following aggregations must be routed to all shards: + // - Any collectionless aggregation, such as non-localOps $currentOp. + // - Any aggregation which begins with a $changeStream stage. + return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); +} + +StatusWith<CachedCollectionRoutingInfo> MongoSInterface::getExecutionNsRoutingInfo( + OperationContext* opCtx, const NamespaceString& execNss) { + // First, verify that there are shards present in the cluster. If not, then we return the + // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because + // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on + // a collection before its enclosing database is created. However, if there are no shards + // present, then $changeStream should immediately return an empty cursor just as other + // aggregations do when the database does not exist. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); + if (shardIds.size() == 0) { + return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; + } + + // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not + // exist. + return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); +} + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +MongoSInterface::DispatchShardPipelineResults MongoSInterface::dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. + // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the + // entire aggregation commmand. + auto cursors = std::vector<RemoteCursor>(); + auto shardResults = std::vector<AsyncRequestsSender::Response>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const bool needsMongosMerge = pipeline->needsMongosMerger(); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); + + // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. + // Otherwise, uassert on all exceptions here. + if (!(liteParsedPipeline.hasChangeStream() && + executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { + uassertStatusOK(executionNsRoutingInfoStatus); + } + + auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() + ? std::move(executionNsRoutingInfoStatus.getValue()) + : boost::optional<CachedCollectionRoutingInfo>{}; + + // Determine whether we can run the entire aggregation on a single shard. + const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); + std::set<ShardId> shardIds = getTargetedShards( + opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); + + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->computeAndSetAtClusterTime( + opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); + } + + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && executionNsRoutingInfo && + *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); + + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + if (needsSplit) { + splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); + + exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( + opCtx, splitPipeline->mergePipeline.get()); + } + + // Generate the command object for the targeted shards. + BSONObj targetedCommand = splitPipeline + ? createCommandForTargetedShards( + opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) + : createPassthroughCommandForShard( + opCtx, aggRequest, boost::none, pipeline.get(), collationObj); + + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAll) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } + } + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAll) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and + // should not participate in the shard version protocol. + shardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db(), + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target + // shards, and should participate in the shard version protocol. + invariant(executionNsRoutingInfo); + shardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db(), + executionNss, + *executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation()); + } + } else { + cursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + targetedCommand, + aggRequest, + ReadPreferenceSetting::get(opCtx), + shardQuery); + invariant(cursors.size() % shardIds.size() == 0, + str::stream() << "Number of cursors (" << cursors.size() + << ") is not a multiple of producers (" + << shardIds.size() + << ")"); + } + + // Convert remote cursors into a vector of "owned" cursors. + std::vector<OwnedRemoteCursor> ownedCursors; + for (auto&& cursor : cursors) { + ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); + } + + // Record the number of shards involved in the aggregation. If we are required to merge on + // the primary shard, but the primary shard was not in the set of targeted shards, then we + // must increment the number of involved shards. + CurOp::get(opCtx)->debug().nShards = + shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && + !shardIds.count(executionNsRoutingInfo->db().primaryId())); + + return DispatchShardPipelineResults{needsPrimaryShardMerge, + std::move(ownedCursors), + std::move(shardResults), + std::move(splitPipeline), + std::move(pipeline), + targetedCommand, + shardIds.size(), + exchangeSpec}; +} + +std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) { + // Explain is not supported for auxiliary lookups. + invariant(!expCtx->explain); + + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + if (pipelineOptions.optimize) { + pipeline->optimizePipeline(); + } + if (pipelineOptions.attachCursorSource) { + // 'attachCursorSourceToPipeline' handles any complexity related to sharding. + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); + } + + return pipeline; +} + + +std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + + // Generate the command object for the targeted shards. + std::vector<BSONObj> rawStages = [pipeline]() { + auto serialization = pipeline->serialize(); + std::vector<BSONObj> stages; + stages.reserve(serialization.size()); + + for (const auto& stageObj : serialization) { + invariant(stageObj.getType() == BSONType::Object); + stages.push_back(stageObj.getDocument().toBson()); + } + + return stages; + }(); + + AggregationRequest aggRequest(expCtx->ns, rawStages); + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( + expCtx, + expCtx->ns, + aggRequest, + liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)), + expCtx->collation); + + std::vector<ShardId> targetedShards; + targetedShards.reserve(shardDispatchResults.remoteCursors.size()); + for (auto&& remoteCursor : shardDispatchResults.remoteCursors) { + targetedShards.emplace_back(remoteCursor->getShardId().toString()); + } + + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + boost::optional<BSONObj> shardCursorsSortSpec = boost::none; + if (shardDispatchResults.splitPipeline) { + mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); + shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + } else { + mergePipeline = std::move(shardDispatchResults.pipelineForSingleShard); + } + + cluster_aggregation_planner::addMergeCursorsSource( + mergePipeline.get(), + liteParsedPipeline, + shardDispatchResults.commandForTargetedShards, + std::move(shardDispatchResults.remoteCursors), + targetedShards, + shardCursorsSortSpec, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor()); + + return mergePipeline; +} + boost::optional<Document> MongoSInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index f2806baf001..6111f3346d8 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -32,6 +32,10 @@ #include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_aggregation_planner.h" +#include "mongo/s/query/owned_remote_cursor.h" namespace mongo { @@ -41,6 +45,81 @@ namespace mongo { */ class MongoSInterface final : public MongoProcessCommon { public: + struct DispatchShardPipelineResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on + // the primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<OwnedRemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The split version of the pipeline if more than one shard was targeted, otherwise + // boost::none. + boost::optional<cluster_aggregation_planner::SplitPipeline> splitPipeline; + + // If the pipeline targeted a single shard, this is the pipeline to run on that shard. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; + + // The command object to send to the targeted shards. + BSONObj commandForTargetedShards; + + // How many exchange producers are running the shard part of splitPipeline. + size_t numProducers; + + // The exchange specification if the query can run with the exchange otherwise boost::none. + boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + }; + + static Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req); + + static BSONObj createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + const boost::optional<ShardId>& shardId, + Pipeline* pipeline, + BSONObj collationObj); + + /** + * Appends information to the command sent to the shards which should be appended both if this + * is a passthrough sent to a single shard and if this is a split pipeline. + */ + static BSONObj genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const boost::optional<ShardId>& shardId, + const AggregationRequest& request, + BSONObj collationObj); + + static BSONObj createCommandForTargetedShards( + OperationContext* opCtx, + const AggregationRequest& request, + const cluster_aggregation_planner::SplitPipeline& splitPipeline, + const BSONObj collationObj, + const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, + bool needsMerge); + + static std::set<ShardId> getTargetedShards( + OperationContext* opCtx, + bool mustRunOnAllShards, + const boost::optional<CachedCollectionRoutingInfo>& routingInfo, + const BSONObj shardQuery, + const BSONObj collation); + + static bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe); + + static StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo( + OperationContext* opCtx, const NamespaceString& execNss); + + static DispatchShardPipelineResults dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + BSONObj collationObj); + MongoSInterface() = default; virtual ~MongoSInterface() = default; @@ -119,10 +198,8 @@ public: MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - MONGO_UNREACHABLE; - } + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final { MONGO_UNREACHABLE; @@ -133,12 +210,10 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final { - MONGO_UNREACHABLE; - } + const MakePipelineOptions pipelineOptions) final; /** * The following methods only make sense for data-bearing nodes and should never be called on diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index bf0987ea4ab..1eb3af3aa97 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -274,45 +274,35 @@ void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( _client.runCommand("admin", renameCommandObj, info)); } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline( +std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); + pipeline->optimizePipeline(); } - Status cursorStatus = Status::OK(); - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release()); } - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; + return pipeline; } -Status MongoInterfaceStandalone::attachCursorSourceToPipeline( +unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); boost::optional<AutoGetCollectionForReadCommand> autoColl; if (expCtx->uuid) { - try { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, - AutoGetCollection::ViewMode::kViewsForbidden, - Date_t::max(), - AutoStatsTracker::LogMode::kUpdateTop); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // The UUID doesn't exist anymore - return ex.toStatus(); - } + autoColl.emplace(expCtx->opCtx, + NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}, + AutoGetCollection::ViewMode::kViewsForbidden, + Date_t::max(), + AutoStatsTracker::LogMode::kUpdateTop); } else { autoColl.emplace(expCtx->opCtx, expCtx->ns, @@ -337,7 +327,7 @@ Status MongoInterfaceStandalone::attachCursorSourceToPipeline( // the initial cursor stage. pipeline->optimizePipeline(); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const { @@ -381,7 +371,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; } diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index 12627655cd5..f1c7fbcc910 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -87,12 +87,12 @@ public: const NamespaceString& targetNs, const BSONObj& originalCollectionOptions, const std::list<BSONObj>& originalIndexes) final; - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final; + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString&, UUID) const override; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 08b9e073f35..b9d7befb857 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -116,15 +116,15 @@ public: MONGO_UNREACHABLE; } - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) override { MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) override { + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 015ea1c66d1..d06c2e91373 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -26,10 +26,10 @@ env.Library( target='cluster_aggregate', source=[ 'cluster_aggregate.cpp', - 'cluster_aggregation_planner.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/s/query/cluster_aggregation_planner', '$BUILD_DIR/mongo/s/query/cluster_client_cursor', '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface', '$BUILD_DIR/mongo/db/views/views', @@ -37,6 +37,16 @@ env.Library( ] ) +env.Library( + target='cluster_aggregation_planner', + source=[ + 'cluster_aggregation_planner.cpp', + ], + LIBDEPS=[ + 'cluster_query', + ] +) + env.CppUnitTest( target="cluster_aggregate_test", source=[ diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 9fc978b46ee..506842da29c 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -82,7 +82,6 @@ namespace mongo { using SplitPipeline = cluster_aggregation_planner::SplitPipeline; -MONGO_FAIL_POINT_DEFINE(clusterAggregateHangBeforeEstablishingShardCursors); MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToEstablishMergingShardCursor); MONGO_FAIL_POINT_DEFINE(clusterAggregateFailToDispatchExchangeConsumerPipeline); @@ -90,41 +89,6 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries; namespace { -Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) { - // The idempotent retry policy will retry even for writeConcern failures, so only set it if the - // pipeline does not support writeConcern. - if (req.getWriteConcern()) { - return Shard::RetryPolicy::kNotIdempotent; - } - return Shard::RetryPolicy::kIdempotent; -} - -// Given a document representing an aggregation command such as -// -// {aggregate: "myCollection", pipeline: [], ...}, -// -// produces the corresponding explain command: -// -// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} -Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { - MutableDocument explainCommandBuilder; - explainCommandBuilder["explain"] = Value(aggregateCommand); - // Downstream host targeting code expects queryOptions at the top level of the command object. - explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = - Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); - - // readConcern needs to be promoted to the top-level of the request. - explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = - Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); - - // Add explain command options. - for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { - explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); - } - - return explainCommandBuilder.freeze(); -} - Status appendCursorResponseToCommandResult(const ShardId& shardId, const BSONObj cursorResponse, BSONObjBuilder* result) { @@ -138,143 +102,6 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, return getStatusFromCommandResult(result->asTempObj()); } -bool mustRunOnAllShards(const NamespaceString& nss, const LiteParsedPipeline& litePipe) { - // The following aggregations must be routed to all shards: - // - Any collectionless aggregation, such as non-localOps $currentOp. - // - Any aggregation which begins with a $changeStream stage. - return nss.isCollectionlessAggregateNS() || litePipe.hasChangeStream(); -} - -StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, - const NamespaceString& execNss) { - // First, verify that there are shards present in the cluster. If not, then we return the - // stronger 'ShardNotFound' error rather than 'NamespaceNotFound'. We must do this because - // $changeStream aggregations ignore NamespaceNotFound in order to allow streams to be opened on - // a collection before its enclosing database is created. However, if there are no shards - // present, then $changeStream should immediately return an empty cursor just as other - // aggregations do when the database does not exist. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - if (shardIds.size() == 0) { - return {ErrorCodes::ShardNotFound, "No shards are present in the cluster"}; - } - - // This call to getCollectionRoutingInfoForTxnCmd will return !OK if the database does not - // exist. - return getCollectionRoutingInfoForTxnCmd(opCtx, execNss); -} - -std::set<ShardId> getTargetedShards(OperationContext* opCtx, - bool mustRunOnAllShards, - const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj shardQuery, - const BSONObj collation) { - if (mustRunOnAllShards) { - // The pipeline begins with a stage which must be run on all shards. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx, &shardIds); - return {shardIds.begin(), shardIds.end()}; - } - - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo); - - return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); -} - -/** - * Appends information to the command sent to the shards which should be appended both if this is a - * passthrough sent to a single shard and if this is a split pipeline. - */ -BSONObj genericTransformForShards(MutableDocument&& cmdForShards, - OperationContext* opCtx, - const boost::optional<ShardId>& shardId, - const AggregationRequest& request, - BSONObj collationObj) { - cmdForShards[AggregationRequest::kFromMongosName] = Value(true); - // If this is a request for an aggregation explain, then we must wrap the aggregate inside an - // explain command. - if (auto explainVerbosity = request.getExplain()) { - cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); - } - - if (!collationObj.isEmpty()) { - cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); - } - - if (opCtx->getTxnNumber()) { - invariant(cmdForShards.peek()[OperationSessionInfo::kTxnNumberFieldName].missing(), - str::stream() << "Command for shards unexpectedly had the " - << OperationSessionInfo::kTxnNumberFieldName - << " field set: " - << cmdForShards.peek().toString()); - cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = - Value(static_cast<long long>(*opCtx->getTxnNumber())); - } - - auto aggCmd = cmdForShards.freeze().toBson(); - - if (shardId) { - if (auto txnRouter = TransactionRouter::get(opCtx)) { - aggCmd = txnRouter->attachTxnFieldsIfNeeded(*shardId, aggCmd); - } - } - - // agg creates temp collection and should handle implicit create separately. - return appendAllowImplicitCreate(aggCmd, true); -} - -BSONObj createPassthroughCommandForShard(OperationContext* opCtx, - const AggregationRequest& request, - const boost::optional<ShardId>& shardId, - Pipeline* pipeline, - BSONObj collationObj) { - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - if (pipeline) { - targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); - } - - return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); -} - -BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const SplitPipeline& splitPipeline, - const BSONObj collationObj, - const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec, - bool needsMerge) { - - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it - // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may - // have detected a logged in user and appended that user name to the $listSessions spec to - // send to the shards. - targetedCmd[AggregationRequest::kPipelineName] = - Value(splitPipeline.shardsPipeline->serialize()); - - // When running on many shards with the exchange we may not need merging. - if (needsMerge) { - targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); - - // For split pipelines which need merging, do *not* propagate the writeConcern to the shards - // part. Otherwise this is part of an exchange and in that case we should include the - // writeConcern. - targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); - } - - targetedCmd[AggregationRequest::kCursorName] = - Value(DOC(AggregationRequest::kBatchSizeName << 0)); - - targetedCmd[AggregationRequest::kExchangeName] = - exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); - - return genericTransformForShards( - std::move(targetedCmd), opCtx, boost::none, request, collationObj); -} - BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const ShardId& shardId, @@ -302,252 +129,13 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, return appendAllowImplicitCreate(aggCmd, true); } -std::vector<RemoteCursor> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - boost::optional<CachedCollectionRoutingInfo>& routingInfo, - const BSONObj& cmdObj, - const AggregationRequest& request, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery) { - LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - - const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); - std::set<ShardId> shardIds = - getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); - std::vector<std::pair<ShardId, BSONObj>> requests; - - // If we don't need to run on all shards, then we should always have a valid routing table. - invariant(routingInfo || mustRunOnAll); - - if (mustRunOnAll) { - // The pipeline contains a stage which must be run on all shards. Skip versioning and - // enqueue the raw command objects. - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - } else if (routingInfo->cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation, and build versioned requests for them. - for (auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); - requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); - } - } else { - // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo->db().primaryId(), - !routingInfo->db().primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj); - } - - if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " - "until fail point is disabled."; - while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - sleepsecs(1); - } - } - - return establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */, - getDesiredRetryPolicy(request)); -} - -struct DispatchShardPipelineResults { - // True if this pipeline was split, and the second half of the pipeline needs to be run on the - // primary shard for the database. - bool needsPrimaryShardMerge; - - // Populated if this *is not* an explain, this vector represents the cursors on the remote - // shards. - std::vector<OwnedRemoteCursor> remoteCursors; - - // Populated if this *is* an explain, this vector represents the results from each shard. - std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - - // The split version of the pipeline if more than one shard was targeted, otherwise boost::none. - boost::optional<SplitPipeline> splitPipeline; - - // If the pipeline targeted a single shard, this is the pipeline to run on that shard. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForSingleShard; - - // The command object to send to the targeted shards. - BSONObj commandForTargetedShards; - - // How many exchange producers are running the shard part of splitPipeline. - size_t numProducers; - - // The exchange specification if the query can run with the exchange otherwise boost::none. - boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; -}; - -/** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and - * the pipeline that will need to be executed to merge the results from the remotes. If a stale - * shard version is encountered, refreshes the routing table and tries again. - */ -DispatchShardPipelineResults dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline, - BSONObj collationObj) { - // The process is as follows: - // - First, determine whether we need to target more than one shard. If so, we split the - // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - Stale shard version errors are thrown up to the top-level handler, causing a retry on the - // entire aggregation commmand. - auto cursors = std::vector<RemoteCursor>(); - auto shardResults = std::vector<AsyncRequestsSender::Response>(); - auto opCtx = expCtx->opCtx; - - const bool needsPrimaryShardMerge = - (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); - - const bool needsMongosMerge = pipeline->needsMongosMerger(); - - const auto shardQuery = pipeline->getInitialQuery(); - - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); - - // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. - // Otherwise, uassert on all exceptions here. - if (!(liteParsedPipeline.hasChangeStream() && - executionNsRoutingInfoStatus == ErrorCodes::NamespaceNotFound)) { - uassertStatusOK(executionNsRoutingInfoStatus); - } - - auto executionNsRoutingInfo = executionNsRoutingInfoStatus.isOK() - ? std::move(executionNsRoutingInfoStatus.getValue()) - : boost::optional<CachedCollectionRoutingInfo>{}; - - // Determine whether we can run the entire aggregation on a single shard. - const bool mustRunOnAll = mustRunOnAllShards(executionNss, liteParsedPipeline); - std::set<ShardId> shardIds = getTargetedShards( - opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation()); - - if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->computeAndSetAtClusterTime( - opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation()); - } - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && executionNsRoutingInfo && - *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); - - boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; - boost::optional<SplitPipeline> splitPipeline; - - if (needsSplit) { - splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); - - exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( - opCtx, splitPipeline->mergePipeline.get()); - } - - // Generate the command object for the targeted shards. - BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards( - opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) - : createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, pipeline.get(), collationObj); - - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAll) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - } - - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - if (expCtx->explain) { - if (mustRunOnAll) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and - // should not participate in the shard version protocol. - shardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db(), - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target - // shards, and should participate in the shard version protocol. - invariant(executionNsRoutingInfo); - shardResults = - scatterGatherVersionedTargetByRoutingTable(opCtx, - executionNss.db(), - executionNss, - *executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation()); - } - } else { - cursors = establishShardCursors(opCtx, - executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - targetedCommand, - aggRequest, - ReadPreferenceSetting::get(opCtx), - shardQuery); - invariant(cursors.size() % shardIds.size() == 0, - str::stream() << "Number of cursors (" << cursors.size() - << ") is not a multiple of producers (" - << shardIds.size() - << ")"); - } - - // Convert remote cursors into a vector of "owned" cursors. - std::vector<OwnedRemoteCursor> ownedCursors; - for (auto&& cursor : cursors) { - ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), executionNss)); - } - - // Record the number of shards involved in the aggregation. If we are required to merge on - // the primary shard, but the primary shard was not in the set of targeted shards, then we - // must increment the number of involved shards. - CurOp::get(opCtx)->debug().nShards = - shardIds.size() + (needsPrimaryShardMerge && executionNsRoutingInfo && - !shardIds.count(executionNsRoutingInfo->db().primaryId())); - - return DispatchShardPipelineResults{needsPrimaryShardMerge, - std::move(ownedCursors), - std::move(shardResults), - std::move(splitPipeline), - std::move(pipeline), - targetedCommand, - shardIds.size(), - exchangeSpec}; -} - -DispatchShardPipelineResults dispatchExchangeConsumerPipeline( +MongoSInterface::DispatchShardPipelineResults dispatchExchangeConsumerPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, BSONObj collationObj, - DispatchShardPipelineResults* shardDispatchResults) { + MongoSInterface::DispatchShardPipelineResults* shardDispatchResults) { invariant(!liteParsedPipeline.hasChangeStream()); auto opCtx = expCtx->opCtx; @@ -584,7 +172,7 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none); - auto consumerCmdObj = createCommandForTargetedShards( + auto consumerCmdObj = MongoSInterface::createCommandForTargetedShards( opCtx, aggRequest, consumerPipelines.back(), collationObj, boost::none, false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], @@ -617,16 +205,16 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( static_cast<DocumentSourceMergeCursors*>(pipeline.shardsPipeline->peekFront()); mergeCursors->dismissCursorOwnership(); } - return DispatchShardPipelineResults{false, - std::move(ownedCursors), - {} /*TODO SERVER-36279*/, - std::move(splitPipeline), - nullptr, - BSONObj(), - numConsumers}; + return MongoSInterface::DispatchShardPipelineResults{false, + std::move(ownedCursors), + {} /*TODO SERVER-36279*/, + std::move(splitPipeline), + nullptr, + BSONObj(), + numConsumers}; } -Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, +Status appendExplainResults(MongoSInterface::DispatchShardPipelineResults&& dispatchResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, BSONObjBuilder* result) { if (dispatchResults.splitPipeline) { @@ -688,7 +276,7 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request); + Shard::RetryPolicy retryPolicy = MongoSInterface::getDesiredRetryPolicy(request); return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy)); } @@ -914,36 +502,16 @@ ShardId pickMergingShard(OperationContext* opCtx, : targetedShards[prng.nextInt32(targetedShards.size())]; } -// "Resolve" involved namespaces and verify that none of them are sharded unless allowed by the -// pipeline. We won't try to execute anything on a mongos, but we still have to populate this map so -// that any $lookups, etc. will be able to have a resolved view definition. It's okay that this is -// incorrect, we will repopulate the real namespace map on the mongod. Note that this function must -// be called before forwarding an aggregation command on an unsharded collection, in order to verify -// that the involved namespaces are allowed to be sharded. -StringMap<ExpressionContext::ResolvedNamespace> resolveInvolvedNamespaces( - OperationContext* opCtx, const LiteParsedPipeline& litePipe) { - - StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - for (auto&& nss : litePipe.getInvolvedNamespaces()) { - const auto resolvedNsRoutingInfo = - uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); - uassert(28769, - str::stream() << nss.ns() << " cannot be sharded", - !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss)); - resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); - } - return resolvedNamespaces; -} - -// Build an appropriate ExpressionContext for the pipeline. This helper validates that all involved -// namespaces are unsharded, instantiates an appropriate collator, creates a MongoProcessInterface -// for use by the pipeline's stages, and optionally extracts the UUID from the collection info if -// present. -boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx, - const AggregationRequest& request, - const LiteParsedPipeline& litePipe, - BSONObj collationObj, - boost::optional<UUID> uuid) { +// Build an appropriate ExpressionContext for the pipeline. This helper instantiates an appropriate +// collator, creates a MongoProcessInterface for use by the pipeline's stages, and optionally +// extracts the UUID from the collection info if present. +boost::intrusive_ptr<ExpressionContext> makeExpressionContext( + OperationContext* opCtx, + const AggregationRequest& request, + const LiteParsedPipeline& litePipe, + BSONObj collationObj, + boost::optional<UUID> uuid, + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) { std::unique_ptr<CollatorInterface> collation; if (!collationObj.isEmpty()) { @@ -958,7 +526,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* request, std::move(collation), std::make_shared<MongoSInterface>(), - resolveInvolvedNamespaces(opCtx, litePipe), + std::move(resolvedNamespaces), uuid); mergeCtx->inMongos = true; @@ -1002,7 +570,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const AggregationRequest& request, const LiteParsedPipeline& litePipe, const boost::optional<CachedCollectionRoutingInfo>& routingInfo, - DispatchShardPipelineResults&& shardDispatchResults, + MongoSInterface::DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result) { // We should never be in a situation where we call this function on a non-merge pipeline. invariant(shardDispatchResults.splitPipeline); @@ -1089,7 +657,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, const AggregationRequest& request, BSONObjBuilder* result) { uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec()); - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); + auto executionNsRoutingInfoStatus = + MongoSInterface::getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); boost::optional<CachedCollectionRoutingInfo> routingInfo; LiteParsedPipeline litePipe(request); @@ -1109,18 +678,38 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // Determine whether this aggregation must be dispatched to all shards in the cluster. - const bool mustRunOnAll = mustRunOnAllShards(namespaces.executionNss, litePipe); + const bool mustRunOnAll = + MongoSInterface::mustRunOnAllShards(namespaces.executionNss, litePipe); // If we don't have a routing table, then this is a $changeStream which must run on all shards. invariant(routingInfo || (mustRunOnAll && litePipe.hasChangeStream())); - // If this pipeline is not on a sharded collection, is allowed to be forwarded to shards, does - // not need to run on all shards, and doesn't need to go through DocumentSource::serialize(), - // then go ahead and pass it through to the owning shard unmodified. Note that we first call - // resolveInvolvedNamespaces to validate that none of the namespaces are sharded. + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + bool involvesShardedCollections = false; + for (auto&& nss : litePipe.getInvolvedNamespaces()) { + const auto resolvedNsRoutingInfo = + uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, nss)); + + uassert(28769, + str::stream() << nss.ns() << " cannot be sharded", + !resolvedNsRoutingInfo.cm() || litePipe.allowShardedForeignCollection(nss)); + + resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); + if (resolvedNsRoutingInfo.cm()) { + involvesShardedCollections = true; + } + } + + // A pipeline is allowed to passthrough to the primary shard iff the following conditions are + // met: + // + // 1. The namespace of the aggregate and any other involved namespaces are unsharded. + // 2. Is allowed to be forwarded to shards. + // 3. Does not need to run on all shards. + // 4. Doesn't need transformation via DocumentSource::serialize(). if (routingInfo && !routingInfo->cm() && !mustRunOnAll && - litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { - resolveInvolvedNamespaces(opCtx, litePipe); + litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos() && + !involvesShardedCollections) { const auto primaryShardId = routingInfo->db().primary()->getId(); return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result); } @@ -1133,7 +722,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Build an ExpressionContext for the pipeline. This instantiates an appropriate collator, // resolves all involved namespaces, and creates a shared MongoProcessInterface for use by the // pipeline's stages. - auto expCtx = makeExpressionContext(opCtx, request, litePipe, collationObj, uuid); + auto expCtx = makeExpressionContext( + opCtx, request, litePipe, collationObj, uuid, std::move(resolvedNamespaces)); // Parse and optimize the full pipeline. auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); @@ -1154,7 +744,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = dispatchShardPipeline( + auto shardDispatchResults = MongoSInterface::dispatchShardPipeline( expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, @@ -1237,7 +827,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj())); + MongoSInterface::createPassthroughCommandForShard( + opCtx, aggRequest, shardId, nullptr, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, |